This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch feat/cache-jcasbin-id-mapping
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 32aa12e3975691bb9cd4e9f0962bf3d5f60f06b8
Author: yuqi <[email protected]>
AuthorDate: Fri Apr 17 02:39:13 2026 +0800

    feat(cache): add id-mapping caches and change poller to JcasbinAuthorizer
    
    Introduce version-validated caches (userRoleCache, groupRoleCache,
    loadedRoles with updated_at) and eventual-consistency caches
    (metadataIdCache, ownerRelCache) backed by GravitinoCache. Implement
    4-step auth flow with group role loading, batch role version check,
    and a ScheduledExecutorService poller for owner and entity structural
    changes. Add handleEntityStructuralChange to GravitinoAuthorizer
    interface and new config entries for metadata-id cache size and
    change poll interval.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 .../main/java/org/apache/gravitino/Configs.java    |  18 +
 .../authorization/GravitinoAuthorizer.java         |  14 +
 .../authorization/jcasbin/CachedGroupRoles.java    |  51 ++
 .../authorization/jcasbin/CachedUserRoles.java     |  51 ++
 .../authorization/jcasbin/JcasbinAuthorizer.java   | 672 ++++++++++++++++-----
 .../jcasbin/TestJcasbinAuthorizerCacheHelpers.java | 185 ++++++
 6 files changed, 838 insertions(+), 153 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/Configs.java 
b/core/src/main/java/org/apache/gravitino/Configs.java
index 3cc9d703e6..7a35f738ad 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -329,6 +329,24 @@ public class Configs {
           .longConf()
           .createWithDefault(DEFAULT_GRAVITINO_AUTHORIZATION_OWNER_CACHE_SIZE);
 
+  public static final long 
DEFAULT_GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE = 100000L;
+
+  public static final ConfigEntry<Long> 
GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE =
+      new ConfigBuilder("gravitino.authorization.jcasbin.metadataIdCacheSize")
+          .doc("The maximum size of the metadata-id cache for authorization")
+          .version(ConfigConstants.VERSION_1_3_0)
+          .longConf()
+          
.createWithDefault(DEFAULT_GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE);
+
+  public static final long 
DEFAULT_GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS = 30L;
+
+  public static final ConfigEntry<Long> 
GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS =
+      new 
ConfigBuilder("gravitino.authorization.jcasbin.changePollIntervalSecs")
+          .doc("The interval in seconds for polling entity and owner changes")
+          .version(ConfigConstants.VERSION_1_3_0)
+          .longConf()
+          
.createWithDefault(DEFAULT_GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS);
+
   public static final ConfigEntry<List<String>> SERVICE_ADMINS =
       new ConfigBuilder("gravitino.authorization.serviceAdmins")
           .doc("The admins of Gravitino service")
diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
 
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
index 7965f173ad..81a86a4bd2 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/GravitinoAuthorizer.java
@@ -164,4 +164,18 @@ public interface GravitinoAuthorizer extends Closeable {
    */
   void handleMetadataOwnerChange(
       String metalake, Long oldOwnerId, NameIdentifier nameIdentifier, 
Entity.EntityType type);
+
+  /**
+   * Called when an entity undergoes a structural change (rename or drop) that 
invalidates cached
+   * name-to-id mappings in the metadataIdCache. The authorizer evicts the 
cache key for the given
+   * entity and all of its descendants (cascade invalidation).
+   *
+   * @param metalake the metalake name
+   * @param nameIdentifier the entity name identifier
+   * @param type the entity type
+   */
+  default void handleEntityStructuralChange(
+      String metalake, NameIdentifier nameIdentifier, Entity.EntityType type) {
+    // default no-op for backward compatibility
+  }
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/CachedGroupRoles.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/CachedGroupRoles.java
new file mode 100644
index 0000000000..8cb5191d4c
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/CachedGroupRoles.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import java.util.List;
+
+/**
+ * Cached snapshot of a group's role assignments. The {@code updatedAt} 
timestamp corresponds to the
+ * {@code group_meta.updated_at} column and is used as a version sentinel: if 
the DB value is newer,
+ * the cached role list is stale and must be reloaded.
+ */
+public class CachedGroupRoles {
+
+  private final long groupId;
+  private final long updatedAt;
+  private final List<Long> roleIds;
+
+  public CachedGroupRoles(long groupId, long updatedAt, List<Long> roleIds) {
+    this.groupId = groupId;
+    this.updatedAt = updatedAt;
+    this.roleIds = roleIds;
+  }
+
+  public long getGroupId() {
+    return groupId;
+  }
+
+  public long getUpdatedAt() {
+    return updatedAt;
+  }
+
+  public List<Long> getRoleIds() {
+    return roleIds;
+  }
+}
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/CachedUserRoles.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/CachedUserRoles.java
new file mode 100644
index 0000000000..7207fcc692
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/CachedUserRoles.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import java.util.List;
+
+/**
+ * Cached snapshot of a user's direct role assignments. The {@code updatedAt} 
timestamp corresponds
+ * to the {@code user_meta.updated_at} column and is used as a version 
sentinel: if the DB value is
+ * newer, the cached role list is stale and must be reloaded.
+ */
+public class CachedUserRoles {
+
+  private final long userId;
+  private final long updatedAt;
+  private final List<Long> roleIds;
+
+  public CachedUserRoles(long userId, long updatedAt, List<Long> roleIds) {
+    this.userId = userId;
+    this.updatedAt = updatedAt;
+    this.roleIds = roleIds;
+  }
+
+  public long getUserId() {
+    return userId;
+  }
+
+  public long getUpdatedAt() {
+    return updatedAt;
+  }
+
+  public List<Long> getRoleIds() {
+    return roleIds;
+  }
+}
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
index 7630f71ae3..8ec85a307d 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
@@ -17,8 +17,7 @@
 
 package org.apache.gravitino.server.authorization.jcasbin;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
@@ -28,13 +27,13 @@ import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Configs;
@@ -44,18 +43,31 @@ import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.SupportsRelationOperations;
 import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.authorization.AuthorizationRequestContext;
 import org.apache.gravitino.authorization.AuthorizationUtils;
 import org.apache.gravitino.authorization.GravitinoAuthorizer;
 import org.apache.gravitino.authorization.Privilege;
 import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.cache.CaffeineGravitinoCache;
+import org.apache.gravitino.cache.GravitinoCache;
 import org.apache.gravitino.exceptions.NoSuchUserException;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.server.authorization.MetadataIdConverter;
-import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.storage.relational.mapper.EntityChangeLogMapper;
+import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.RoleMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.UserMetaMapper;
+import org.apache.gravitino.storage.relational.po.RolePO;
+import org.apache.gravitino.storage.relational.po.auth.ChangedOwnerInfo;
+import org.apache.gravitino.storage.relational.po.auth.EntityChangeRecord;
+import org.apache.gravitino.storage.relational.po.auth.GroupAuthInfo;
+import org.apache.gravitino.storage.relational.po.auth.OwnerInfo;
+import org.apache.gravitino.storage.relational.po.auth.RoleUpdatedAt;
+import org.apache.gravitino.storage.relational.po.auth.UserAuthInfo;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.casbin.jcasbin.main.Enforcer;
@@ -69,6 +81,12 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(JcasbinAuthorizer.class);
 
+  /** Key separator for hierarchical cache keys. */
+  static final String KEY_SEP = "::";
+
+  /** Max rows to fetch per poller cycle. */
+  private static final int POLLER_MAX_ROWS = 500;
+
   /** Jcasbin enforcer is used for metadata authorization. */
   private Enforcer allowEnforcer;
 
@@ -81,15 +99,43 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
   /** deny internal authorizer */
   private InternalAuthorizer denyInternalAuthorizer;
 
+  // ---- Version-validated caches (strong consistency) ----
+
+  /**
+   * userRoleCache: metalake::userName -> CachedUserRoles. Version-validated 
per request via
+   * user_meta.updated_at.
+   */
+  private GravitinoCache<String, CachedUserRoles> userRoleCache;
+
+  /**
+   * groupRoleCache: metalake::groupId -> CachedGroupRoles. Version-validated 
per request via
+   * group_meta.updated_at.
+   */
+  private GravitinoCache<String, CachedGroupRoles> groupRoleCache;
+
   /**
-   * loadedRoles is used to cache roles that have loaded permissions. When the 
permissions of a role
-   * are updated, they should be removed from it.
+   * loadedRoles: roleId -> updated_at. If the DB updated_at is newer, evict 
and reload policies.
    */
-  private Cache<Long, Boolean> loadedRoles;
+  private GravitinoCache<Long, Long> loadedRoles;
 
-  private Cache<Long, Optional<Long>> ownerRel;
+  // ---- Eventual consistency caches (poller-driven) ----
 
-  private Executor executor = null;
+  /**
+   * metadataIdCache: hierarchical key 
(metalake::catalog::schema::table::TYPE) -> entity id.
+   * Evicted by entity change poller.
+   */
+  private GravitinoCache<String, Long> metadataIdCache;
+
+  /** ownerRelCache: metadataObjectId -> Optional(ownerId). Evicted by owner 
change poller. */
+  private GravitinoCache<Long, Optional<Long>> ownerRelCache;
+
+  /** Scheduled poller for owner and entity structural changes. */
+  private ScheduledExecutorService changePoller;
+
+  /** High-water marks for pollers. */
+  private volatile long ownerPollHighWaterMark = 0;
+
+  private volatile long entityPollHighWaterMark = 0;
 
   @Override
   public void initialize() {
@@ -101,41 +147,48 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
         
GravitinoEnv.getInstance().config().get(Configs.GRAVITINO_AUTHORIZATION_ROLE_CACHE_SIZE);
     long ownerCacheSize =
         
GravitinoEnv.getInstance().config().get(Configs.GRAVITINO_AUTHORIZATION_OWNER_CACHE_SIZE);
+    long metadataIdCacheSize =
+        GravitinoEnv.getInstance()
+            .config()
+            .get(Configs.GRAVITINO_AUTHORIZATION_METADATA_ID_CACHE_SIZE);
+    long pollIntervalSecs =
+        GravitinoEnv.getInstance()
+            .config()
+            .get(Configs.GRAVITINO_AUTHORIZATION_CHANGE_POLL_INTERVAL_SECS);
+
+    long ttlMs = cacheExpirationSecs * 1000L;
 
-    // Initialize enforcers before the caches that reference them in removal 
listeners
+    // Initialize enforcers before caches that reference them in removal 
listeners
     allowEnforcer = new SyncedEnforcer(getModel("/jcasbin_model.conf"), new 
GravitinoAdapter());
     allowInternalAuthorizer = new InternalAuthorizer(allowEnforcer);
     denyEnforcer = new SyncedEnforcer(getModel("/jcasbin_model.conf"), new 
GravitinoAdapter());
     denyInternalAuthorizer = new InternalAuthorizer(denyEnforcer);
 
-    loadedRoles =
-        Caffeine.newBuilder()
-            .expireAfterAccess(cacheExpirationSecs, TimeUnit.SECONDS)
-            .maximumSize(roleCacheSize)
-            .executor(Runnable::run)
-            .removalListener(
-                (roleId, value, cause) -> {
-                  if (roleId != null) {
-                    allowEnforcer.deleteRole(String.valueOf(roleId));
-                    denyEnforcer.deleteRole(String.valueOf(roleId));
-                  }
-                })
-            .build();
-    ownerRel =
-        Caffeine.newBuilder()
-            .expireAfterAccess(cacheExpirationSecs, TimeUnit.SECONDS)
-            .maximumSize(ownerCacheSize)
-            .build();
-    executor =
-        Executors.newFixedThreadPool(
-            GravitinoEnv.getInstance()
-                .config()
-                .get(Configs.GRAVITINO_AUTHORIZATION_THREAD_POOL_SIZE),
-            runnable -> {
-              Thread thread = new Thread(runnable);
-              thread.setName("GravitinoAuthorizer-ThreadPool-" + 
thread.getId());
-              return thread;
+    // loadedRoles: roleId -> updated_at.
+    // When evicted, we must clean up the corresponding JCasbin policies.
+    loadedRoles = new LoadedRolesCache(ttlMs, roleCacheSize, allowEnforcer, 
denyEnforcer);
+
+    userRoleCache = new CaffeineGravitinoCache<>(ttlMs, roleCacheSize);
+    groupRoleCache = new CaffeineGravitinoCache<>(ttlMs, roleCacheSize);
+    metadataIdCache = new CaffeineGravitinoCache<>(ttlMs, metadataIdCacheSize);
+    ownerRelCache = new CaffeineGravitinoCache<>(ttlMs, ownerCacheSize);
+
+    // Initialize high-water marks to current time so we only pick up future 
changes
+    long now = System.currentTimeMillis();
+    ownerPollHighWaterMark = now;
+    entityPollHighWaterMark = now;
+
+    // Start the change poller
+    changePoller =
+        Executors.newSingleThreadScheduledExecutor(
+            r -> {
+              Thread t = new Thread(r);
+              t.setName("GravitinoAuthorizer-ChangePoller");
+              t.setDaemon(true);
+              return t;
             });
+    changePoller.scheduleWithFixedDelay(
+        this::pollChanges, pollIntervalSecs, pollIntervalSecs, 
TimeUnit.SECONDS);
   }
 
   private Model getModel(String modelFilePath) {
@@ -150,6 +203,10 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     return model;
   }
 
+  // 
---------------------------------------------------------------------------
+  //  Authorize / deny / isOwner
+  // 
---------------------------------------------------------------------------
+
   @Override
   public boolean authorize(
       Principal principal,
@@ -218,15 +275,14 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       String metalake,
       MetadataObject metadataObject,
       AuthorizationRequestContext requestContext) {
-    Long userId;
     boolean result;
     try {
-      Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
-      loadOwnerPolicy(metalake, metadataObject, metadataId);
+      Long metadataId = resolveMetadataId(metadataObject, metalake);
+      loadOwnerPolicy(metadataId);
       UserEntity userEntity = getUserEntity(principal.getName(), metalake);
-      userId = userEntity.id();
-      metadataId = MetadataIdConverter.getID(metadataObject, metalake);
-      result = Objects.equals(Optional.of(userId), 
ownerRel.getIfPresent(metadataId));
+      Long userId = userEntity.id();
+      Optional<Optional<Long>> ownerOpt = 
ownerRelCache.getIfPresent(metadataId);
+      result = ownerOpt.isPresent() && Objects.equals(Optional.of(userId), 
ownerOpt.get());
     } catch (Exception e) {
       LOG.debug("Can not get entity id", e);
       result = false;
@@ -283,7 +339,7 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
             entityStore
                 .relationOperations()
                 .listEntitiesByRelation(
-                    SupportsRelationOperations.Type.ROLE_USER_REL,
+                    
org.apache.gravitino.SupportsRelationOperations.Type.ROLE_USER_REL,
                     userNameIdentifier,
                     Entity.EntityType.USER);
         return entities.stream().anyMatch(roleEntity -> 
Objects.equals(roleEntity.id(), roleId));
@@ -306,14 +362,13 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     if (isOwner(currentPrincipal, metalake, metalakeObject, requestContext)) {
       return true;
     }
-    MetadataObject.Type metadataType = 
MetadataObject.Type.valueOf(type.toUpperCase());
+    MetadataObject.Type metadataType = 
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT));
     MetadataObject metadataObject =
         MetadataObjects.of(Arrays.asList(fullName.split("\\.")), metadataType);
     do {
       if (isOwner(currentPrincipal, metalake, metadataObject, requestContext)) 
{
         MetadataObject.Type tempType = metadataObject.type();
         if (tempType == MetadataObject.Type.SCHEMA) {
-          // schema owner need use catalog privilege
           boolean hasCatalogUseCatalog =
               authorize(
                   currentPrincipal,
@@ -335,7 +390,6 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
             || tempType == MetadataObject.Type.TOPIC
             || tempType == MetadataObject.Type.FILESET
             || tempType == MetadataObject.Type.MODEL) {
-          // table owner need use_catalog and use_schema privileges
           boolean hasMetalakeUseSchema =
               authorize(
                   currentPrincipal,
@@ -362,7 +416,6 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
         }
         return true;
       }
-      // metadata parent owner can set owner.
     } while ((metadataObject = MetadataObjects.parent(metadataObject)) != 
null);
     return false;
   }
@@ -371,17 +424,12 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
   public boolean hasMetadataPrivilegePermission(
       String metalake, String type, String fullName, 
AuthorizationRequestContext requestContext) {
     Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
-    // Check whether the principal holds MANAGE_GRANTS on the target object or 
any ancestor.
-    // A grant at a broader level (e.g. CATALOG or SCHEMA) implicitly covers 
all objects beneath it.
     MetadataObject.Type metadataType;
     try {
-      metadataType = MetadataObject.Type.valueOf(type.toUpperCase());
+      metadataType = 
MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT));
     } catch (IllegalArgumentException e) {
       throw new IllegalArgumentException("Unknown metadata object type: " + 
type, e);
     }
-    // Build the full ancestor chain from the target object up to and 
including the metalake.
-    // MetadataObjects.parent(CATALOG) returns null (CATALOG is a root in the 
parent API), so the
-    // metalake is appended manually at the end.
     List<MetadataObject> chain = new ArrayList<>();
     for (MetadataObject obj = MetadataObjects.parse(fullName, metadataType);
         obj != null;
@@ -399,6 +447,10 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     return hasSetOwnerPermission(metalake, type, fullName, requestContext);
   }
 
+  // 
---------------------------------------------------------------------------
+  //  Cache invalidation hooks (called from service layer)
+  // 
---------------------------------------------------------------------------
+
   @Override
   public void handleRolePrivilegeChange(Long roleId) {
     loadedRoles.invalidate(roleId);
@@ -409,19 +461,56 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       String metalake, Long oldOwnerId, NameIdentifier nameIdentifier, 
Entity.EntityType type) {
     MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(nameIdentifier, type);
     Long metadataId = MetadataIdConverter.getID(metadataObject, metalake);
-    ownerRel.invalidate(metadataId);
+    ownerRelCache.invalidate(metadataId);
+  }
+
+  @Override
+  public void handleEntityStructuralChange(
+      String metalake, NameIdentifier nameIdentifier, Entity.EntityType type) {
+    MetadataObject metadataObject = 
NameIdentifierUtil.toMetadataObject(nameIdentifier, type);
+    String cacheKey = buildCacheKey(metalake, metadataObject);
+    if (isNonLeaf(metadataObject.type())) {
+      // Cascade invalidation: metalake::catalog:: prefix removes catalog + 
all children
+      metadataIdCache.invalidateByPrefix(cacheKey);
+    } else {
+      metadataIdCache.invalidate(cacheKey);
+    }
   }
 
   @Override
   public void close() throws IOException {
-    if (executor != null) {
-      if (executor instanceof ThreadPoolExecutor) {
-        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
-        threadPoolExecutor.shutdown();
+    if (changePoller != null) {
+      changePoller.shutdown();
+      try {
+        if (!changePoller.awaitTermination(5, TimeUnit.SECONDS)) {
+          changePoller.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        changePoller.shutdownNow();
+        Thread.currentThread().interrupt();
       }
     }
+    if (userRoleCache != null) {
+      userRoleCache.close();
+    }
+    if (groupRoleCache != null) {
+      groupRoleCache.close();
+    }
+    if (loadedRoles != null) {
+      loadedRoles.close();
+    }
+    if (metadataIdCache != null) {
+      metadataIdCache.close();
+    }
+    if (ownerRelCache != null) {
+      ownerRelCache.close();
+    }
   }
 
+  // 
---------------------------------------------------------------------------
+  //  Internal authorizer
+  // 
---------------------------------------------------------------------------
+
   private class InternalAuthorizer {
 
     Enforcer enforcer;
@@ -449,13 +538,24 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
       Long metadataId;
       Long userId;
       try {
-        UserEntity userEntity = getUserEntity(username, metalake);
-        userId = userEntity.id();
-        metadataId = MetadataIdConverter.getID(metadataObject, metalake);
+        // Step 1a: get userId + user updated_at (version sentinel)
+        UserAuthInfo userInfo =
+            SessionUtils.getWithoutCommit(
+                UserMetaMapper.class, m -> m.getUserInfo(metalake, username));
+        if (userInfo == null) {
+          LOG.debug("User {} not found in metalake {}", username, metalake);
+          return false;
+        }
+        userId = userInfo.getUserId();
+
+        // Step 2: resolve metadata name -> id via cache
+        metadataId = resolveMetadataId(metadataObject, metalake);
       } catch (Exception e) {
         LOG.debug("Can not get entity id", e);
         return false;
       }
+
+      // Step 1 + 3: load roles for user and groups, version-validated
       loadRolePrivilege(metalake, username, userId, requestContext);
       return authorizeByJcasbin(userId, metadataObject, metadataId, privilege);
     }
@@ -463,8 +563,8 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     private boolean authorizeByJcasbin(
         Long userId, MetadataObject metadataObject, Long metadataId, String 
privilege) {
       if (AuthConstants.OWNER.equals(privilege)) {
-        Optional<Long> owner = ownerRel.getIfPresent(metadataId);
-        return Objects.equals(Optional.of(userId), owner);
+        Optional<Optional<Long>> ownerOpt = 
ownerRelCache.getIfPresent(metadataId);
+        return ownerOpt.isPresent() && Objects.equals(Optional.of(userId), 
ownerOpt.get());
       }
       return enforcer.enforce(
           String.valueOf(userId),
@@ -474,131 +574,397 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     }
   }
 
-  private static UserEntity getUserEntity(String username, String metalake) 
throws IOException {
-    EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
-    UserEntity userEntity =
-        entityStore.get(
-            NameIdentifierUtil.ofUser(metalake, username),
-            Entity.EntityType.USER,
-            UserEntity.class);
-    return userEntity;
+  // 
---------------------------------------------------------------------------
+  //  Metadata ID resolution with cache
+  // 
---------------------------------------------------------------------------
+
+  private Long resolveMetadataId(MetadataObject metadataObject, String 
metalake) {
+    String cacheKey = buildCacheKey(metalake, metadataObject);
+    Optional<Long> cached = metadataIdCache.getIfPresent(cacheKey);
+    if (cached.isPresent()) {
+      return cached.get();
+    }
+    Long id = MetadataIdConverter.getID(metadataObject, metalake);
+    metadataIdCache.put(cacheKey, id);
+    return id;
   }
 
+  // 
---------------------------------------------------------------------------
+  //  4-step role loading with version validation
+  // 
---------------------------------------------------------------------------
+
   private void loadRolePrivilege(
       String metalake, String username, Long userId, 
AuthorizationRequestContext requestContext) {
     requestContext.loadRole(
         () -> {
-          EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
-          NameIdentifier userNameIdentifier = 
NameIdentifierUtil.ofUser(metalake, username);
-          List<RoleEntity> entities;
-          try {
-            entities =
-                entityStore
-                    .relationOperations()
-                    .listEntitiesByRelation(
-                        SupportsRelationOperations.Type.ROLE_USER_REL,
-                        userNameIdentifier,
-                        Entity.EntityType.USER);
-            List<CompletableFuture<Void>> loadRoleFutures = new ArrayList<>();
-            for (RoleEntity role : entities) {
-              Long roleId = role.id();
-              allowEnforcer.addRoleForUser(String.valueOf(userId), 
String.valueOf(roleId));
-              denyEnforcer.addRoleForUser(String.valueOf(userId), 
String.valueOf(roleId));
-              if (loadedRoles.getIfPresent(roleId) != null) {
-                continue;
-              }
-              CompletableFuture<Void> loadRoleFuture =
-                  CompletableFuture.supplyAsync(
-                          () -> {
-                            try {
-                              return entityStore.get(
-                                  NameIdentifierUtil.ofRole(metalake, 
role.name()),
-                                  Entity.EntityType.ROLE,
-                                  RoleEntity.class);
-                            } catch (Exception e) {
-                              throw new RuntimeException("Failed to load role: 
" + role.name(), e);
-                            }
-                          },
-                          executor)
-                      .thenAcceptAsync(
-                          roleEntity -> {
-                            loadPolicyByRoleEntity(roleEntity);
-                            loadedRoles.put(roleId, true);
-                          },
-                          executor);
-              loadRoleFutures.add(loadRoleFuture);
-            }
-            CompletableFuture.allOf(loadRoleFutures.toArray(new 
CompletableFuture[0])).join();
-          } catch (IOException e) {
-            throw new RuntimeException(e);
+          // Step 1a: check user role cache version
+          UserAuthInfo userInfo =
+              SessionUtils.getWithoutCommit(
+                  UserMetaMapper.class, m -> m.getUserInfo(metalake, 
username));
+          if (userInfo == null) {
+            return;
+          }
+          List<Long> userRoleIds = loadUserRoles(metalake, username, userId, 
userInfo);
+
+          // Step 1b: check group role caches version
+          List<GroupAuthInfo> groupInfos =
+              SessionUtils.getWithoutCommit(
+                  GroupMetaMapper.class, m -> m.getGroupInfoByUserId(userId));
+          List<Long> groupRoleIds = loadGroupRoles(metalake, userId, 
groupInfos);
+
+          // Step 3: batch version-check all collected role IDs, load stale 
ones
+          List<Long> allRoleIds = new ArrayList<>(userRoleIds);
+          allRoleIds.addAll(groupRoleIds);
+          if (!allRoleIds.isEmpty()) {
+            versionCheckAndLoadRoles(metalake, allRoleIds);
           }
         });
   }
 
-  private void loadOwnerPolicy(String metalake, MetadataObject metadataObject, 
Long metadataId) {
-    if (ownerRel.getIfPresent(metadataId) != null) {
-      LOG.debug("Metadata {} OWNER has been loaded.", metadataId);
-      return;
+  private List<Long> loadUserRoles(
+      String metalake, String username, long userId, UserAuthInfo userInfo) {
+    String userCacheKey = metalake + KEY_SEP + username;
+    Optional<CachedUserRoles> cachedOpt = 
userRoleCache.getIfPresent(userCacheKey);
+
+    if (cachedOpt.isPresent() && cachedOpt.get().getUpdatedAt() >= 
userInfo.getUpdatedAt()) {
+      // Cache is still valid
+      CachedUserRoles cached = cachedOpt.get();
+      bindUserRoles(userId, cached.getRoleIds());
+      return cached.getRoleIds();
     }
-    try {
-      NameIdentifier entityIdent = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
-      EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
-      List<? extends Entity> owners =
-          entityStore
-              .relationOperations()
-              .listEntitiesByRelation(
-                  SupportsRelationOperations.Type.OWNER_REL,
-                  entityIdent,
-                  Entity.EntityType.valueOf(metadataObject.type().name()));
-      if (owners.isEmpty()) {
-        ownerRel.put(metadataId, Optional.empty());
+
+    // Cache miss or stale — reload from DB
+    List<RolePO> rolePOs =
+        SessionUtils.getWithoutCommit(RoleMetaMapper.class, m -> 
m.listRolesByUserId(userId));
+    List<Long> roleIds = 
rolePOs.stream().map(RolePO::getRoleId).collect(Collectors.toList());
+
+    userRoleCache.put(userCacheKey, new CachedUserRoles(userId, 
userInfo.getUpdatedAt(), roleIds));
+    bindUserRoles(userId, roleIds);
+    return roleIds;
+  }
+
+  private List<Long> loadGroupRoles(String metalake, long userId, 
List<GroupAuthInfo> groupInfos) {
+    List<Long> allGroupRoleIds = new ArrayList<>();
+    for (GroupAuthInfo groupInfo : groupInfos) {
+      long groupId = groupInfo.getGroupId();
+      String groupCacheKey = metalake + KEY_SEP + groupId;
+      Optional<CachedGroupRoles> cachedOpt = 
groupRoleCache.getIfPresent(groupCacheKey);
+
+      List<Long> roleIds;
+      if (cachedOpt.isPresent() && cachedOpt.get().getUpdatedAt() >= 
groupInfo.getUpdatedAt()) {
+        roleIds = cachedOpt.get().getRoleIds();
       } else {
-        for (Entity ownerEntity : owners) {
-          if (ownerEntity instanceof UserEntity) {
-            UserEntity user = (UserEntity) ownerEntity;
-            ownerRel.put(metadataId, Optional.of(user.id()));
-          }
+        List<RolePO> rolePOs =
+            SessionUtils.getWithoutCommit(RoleMetaMapper.class, m -> 
m.listRolesByGroupId(groupId));
+        roleIds = 
rolePOs.stream().map(RolePO::getRoleId).collect(Collectors.toList());
+        groupRoleCache.put(
+            groupCacheKey, new CachedGroupRoles(groupId, 
groupInfo.getUpdatedAt(), roleIds));
+      }
+      // Bind group roles to the user in JCasbin
+      bindUserRoles(userId, roleIds);
+      allGroupRoleIds.addAll(roleIds);
+    }
+    return allGroupRoleIds;
+  }
+
+  private void versionCheckAndLoadRoles(String metalake, List<Long> roleIds) {
+    // Batch fetch updated_at for all role IDs
+    List<Long> uniqueRoleIds = 
roleIds.stream().distinct().collect(Collectors.toList());
+    List<RoleUpdatedAt> roleVersions =
+        SessionUtils.getWithoutCommit(
+            RoleMetaMapper.class, m -> m.batchGetUpdatedAt(uniqueRoleIds));
+
+    for (RoleUpdatedAt rv : roleVersions) {
+      long roleId = rv.getRoleId();
+      long dbUpdatedAt = rv.getUpdatedAt();
+      Optional<Long> cachedUpdatedAt = loadedRoles.getIfPresent(roleId);
+
+      if (cachedUpdatedAt.isPresent() && cachedUpdatedAt.get() >= dbUpdatedAt) 
{
+        // Role policies are still current
+        continue;
+      }
+
+      // Stale or missing — evict old policies and reload
+      if (cachedUpdatedAt.isPresent()) {
+        allowEnforcer.deleteRole(String.valueOf(roleId));
+        denyEnforcer.deleteRole(String.valueOf(roleId));
+      }
+
+      // Load full role entity and add policies
+      try {
+        EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
+        // We need to find the role name; get it from the mapper directly
+        RolePO rolePO = findRolePOById(roleId, metalake);
+        if (rolePO != null) {
+          RoleEntity roleEntity =
+              entityStore.get(
+                  NameIdentifierUtil.ofRole(metalake, rolePO.getRoleName()),
+                  Entity.EntityType.ROLE,
+                  RoleEntity.class);
+          loadPolicyByRoleEntity(roleEntity);
         }
+      } catch (Exception e) {
+        LOG.warn("Failed to load role policies for roleId {}", roleId, e);
+        continue;
       }
-    } catch (IOException e) {
-      LOG.warn("Can not load metadata owner", e);
+
+      loadedRoles.put(roleId, dbUpdatedAt);
     }
   }
 
+  private RolePO findRolePOById(long roleId, String metalake) {
+    // Get all role POs for the user's roles and find the one matching roleId
+    List<RolePO> rolePOs =
+        SessionUtils.getWithoutCommit(RoleMetaMapper.class, m -> 
m.listRolePOsByMetalake(metalake));
+    for (RolePO po : rolePOs) {
+      if (po.getRoleId() == roleId) {
+        return po;
+      }
+    }
+    return null;
+  }
+
+  private void bindUserRoles(long userId, List<Long> roleIds) {
+    for (Long roleId : roleIds) {
+      allowEnforcer.addRoleForUser(String.valueOf(userId), 
String.valueOf(roleId));
+      denyEnforcer.addRoleForUser(String.valueOf(userId), 
String.valueOf(roleId));
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  //  Owner loading
+  // 
---------------------------------------------------------------------------
+
+  private void loadOwnerPolicy(Long metadataId) {
+    if (ownerRelCache.getIfPresent(metadataId).isPresent()) {
+      LOG.debug("Metadata {} OWNER has been loaded.", metadataId);
+      return;
+    }
+
+    OwnerInfo ownerInfo =
+        SessionUtils.getWithoutCommit(
+            OwnerMetaMapper.class, m -> 
m.selectOwnerByMetadataObjectId(metadataId));
+    if (ownerInfo == null) {
+      ownerRelCache.put(metadataId, Optional.empty());
+    } else {
+      ownerRelCache.put(metadataId, Optional.of(ownerInfo.getOwnerId()));
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  //  Policy loading from role entity
+  // 
---------------------------------------------------------------------------
+
   private void loadPolicyByRoleEntity(RoleEntity roleEntity) {
     String metalake = 
NameIdentifierUtil.getMetalake(roleEntity.nameIdentifier());
     List<SecurableObject> securableObjects = roleEntity.securableObjects();
 
     for (SecurableObject securableObject : securableObjects) {
+      Long securableId = resolveMetadataId(securableObject, metalake);
       for (Privilege privilege : securableObject.privileges()) {
         Privilege.Condition condition = privilege.condition();
         if (AuthConstants.DENY.equalsIgnoreCase(condition.name())) {
           denyEnforcer.addPolicy(
               String.valueOf(roleEntity.id()),
               securableObject.type().name(),
-              String.valueOf(MetadataIdConverter.getID(securableObject, 
metalake)),
+              String.valueOf(securableId),
               AuthorizationUtils.replaceLegacyPrivilegeName(privilege.name())
                   .name()
-                  .toUpperCase(java.util.Locale.ROOT),
+                  .toUpperCase(Locale.ROOT),
               AuthConstants.ALLOW);
         }
-        // Since different roles of a user may simultaneously hold both 
"allow" and "deny"
-        // permissions
-        // for the same privilege on a given MetadataObject, the allowEnforcer 
must also incorporate
-        // the "deny" privilege to ensure that the authorize method correctly 
returns false in such
-        // cases. For example, if role1 has an "allow" privilege for 
SELECT_TABLE on table1, while
-        // role2 has a "deny" privilege for the same action on table1, then a 
user assigned both
-        // roles should receive a false result when calling the authorize 
method.
 
         allowEnforcer.addPolicy(
             String.valueOf(roleEntity.id()),
             securableObject.type().name(),
-            String.valueOf(MetadataIdConverter.getID(securableObject, 
metalake)),
+            String.valueOf(securableId),
             AuthorizationUtils.replaceLegacyPrivilegeName(privilege.name())
                 .name()
-                .toUpperCase(java.util.Locale.ROOT),
-            condition.name().toLowerCase(java.util.Locale.ROOT));
+                .toUpperCase(Locale.ROOT),
+            condition.name().toLowerCase(Locale.ROOT));
+      }
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  //  Change poller (eventual consistency for HA)
+  // 
---------------------------------------------------------------------------
+
+  @VisibleForTesting
+  void pollChanges() {
+    try {
+      pollOwnerChanges();
+    } catch (Exception e) {
+      LOG.warn("Owner change poll failed", e);
+    }
+    try {
+      pollEntityChanges();
+    } catch (Exception e) {
+      LOG.warn("Entity change poll failed", e);
+    }
+  }
+
+  private void pollOwnerChanges() {
+    List<ChangedOwnerInfo> changes =
+        SessionUtils.getWithoutCommit(
+            OwnerMetaMapper.class, m -> 
m.selectChangedOwners(ownerPollHighWaterMark));
+
+    long maxSeen = ownerPollHighWaterMark;
+    for (ChangedOwnerInfo change : changes) {
+      ownerRelCache.invalidate(change.getMetadataObjectId());
+      if (change.getUpdatedAt() > maxSeen) {
+        maxSeen = change.getUpdatedAt();
+      }
+    }
+    ownerPollHighWaterMark = maxSeen;
+  }
+
+  private void pollEntityChanges() {
+    List<EntityChangeRecord> changes =
+        SessionUtils.getWithoutCommit(
+            EntityChangeLogMapper.class,
+            m -> m.selectChanges(entityPollHighWaterMark, POLLER_MAX_ROWS));
+
+    long maxSeen = entityPollHighWaterMark;
+    for (EntityChangeRecord change : changes) {
+      String metalake = change.getMetalakeName();
+      String entityType = change.getEntityType();
+      String fullName = change.getFullName();
+
+      MetadataObject.Type mdType;
+      try {
+        mdType = 
MetadataObject.Type.valueOf(entityType.toUpperCase(Locale.ROOT));
+      } catch (IllegalArgumentException e) {
+        LOG.warn("Unknown entity type in change log: {}", entityType);
+        if (change.getCreatedAt() > maxSeen) {
+          maxSeen = change.getCreatedAt();
+        }
+        continue;
       }
+
+      MetadataObject mdObj = 
MetadataObjects.of(Arrays.asList(fullName.split("\\.")), mdType);
+      String cacheKey = buildCacheKey(metalake, mdObj);
+
+      if (isNonLeaf(mdType)) {
+        metadataIdCache.invalidateByPrefix(cacheKey);
+      } else {
+        metadataIdCache.invalidate(cacheKey);
+      }
+
+      if (change.getCreatedAt() > maxSeen) {
+        maxSeen = change.getCreatedAt();
+      }
+    }
+    entityPollHighWaterMark = maxSeen;
+  }
+
+  // 
---------------------------------------------------------------------------
+  //  Helpers
+  // 
---------------------------------------------------------------------------
+
+  /**
+   * Builds a hierarchical cache key for the metadataIdCache. Non-leaf objects 
end with "::" to
+   * enable prefix-based cascade invalidation.
+   *
+   * <p>Examples: metalake::catalog:: , metalake::catalog::schema:: ,
+   * metalake::catalog::schema::table::TABLE
+   */
+  @VisibleForTesting
+  static String buildCacheKey(String metalake, MetadataObject metadataObject) {
+    StringBuilder sb = new StringBuilder(metalake);
+    sb.append(KEY_SEP);
+    // fullName uses '.' as separator, e.g. "catalog1.schema1.table1"
+    String[] parts = metadataObject.fullName().split("\\.");
+    sb.append(String.join(KEY_SEP, parts));
+    if (isNonLeaf(metadataObject.type())) {
+      // Trailing separator enables prefix-based cascade invalidation
+      sb.append(KEY_SEP);
+    } else {
+      // Leaf nodes get the type suffix to avoid collisions
+      sb.append(KEY_SEP);
+      sb.append(metadataObject.type().name());
+    }
+    return sb.toString();
+  }
+
+  /** Returns true for entity types that can contain children (metalake, 
catalog, schema). */
+  @VisibleForTesting
+  static boolean isNonLeaf(MetadataObject.Type type) {
+    return type == MetadataObject.Type.METALAKE
+        || type == MetadataObject.Type.CATALOG
+        || type == MetadataObject.Type.SCHEMA;
+  }
+
+  private static UserEntity getUserEntity(String username, String metalake) 
throws IOException {
+    EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
+    return entityStore.get(
+        NameIdentifierUtil.ofUser(metalake, username), Entity.EntityType.USER, 
UserEntity.class);
+  }
+
+  // 
---------------------------------------------------------------------------
+  //  LoadedRoles cache — wraps CaffeineGravitinoCache with eviction 
side-effects
+  // 
---------------------------------------------------------------------------
+
+  /**
+   * A specialized GravitinoCache for loaded roles that cleans up JCasbin 
policies on eviction. This
+   * uses a raw Caffeine cache internally so that we can attach a removal 
listener.
+   */
+  private static class LoadedRolesCache implements GravitinoCache<Long, Long> {
+
+    private final com.github.benmanes.caffeine.cache.Cache<Long, Long> cache;
+
+    LoadedRolesCache(long ttlMs, long maxSize, Enforcer allowEnforcer, 
Enforcer denyEnforcer) {
+      this.cache =
+          com.github.benmanes.caffeine.cache.Caffeine.newBuilder()
+              .expireAfterAccess(ttlMs, TimeUnit.MILLISECONDS)
+              .maximumSize(maxSize)
+              .executor(Runnable::run)
+              .removalListener(
+                  (roleId, value, cause) -> {
+                    if (roleId != null) {
+                      allowEnforcer.deleteRole(String.valueOf(roleId));
+                      denyEnforcer.deleteRole(String.valueOf(roleId));
+                    }
+                  })
+              .build();
+    }
+
+    @Override
+    public Optional<Long> getIfPresent(Long key) {
+      Long v = cache.getIfPresent(key);
+      return Optional.ofNullable(v);
+    }
+
+    @Override
+    public void put(Long key, Long value) {
+      cache.put(key, value);
+    }
+
+    @Override
+    public void invalidate(Long key) {
+      cache.invalidate(key);
+    }
+
+    @Override
+    public void invalidateAll() {
+      cache.invalidateAll();
+    }
+
+    @Override
+    public void invalidateByPrefix(String prefix) {
+      cache.asMap().keySet().removeIf(k -> k.toString().startsWith(prefix));
+    }
+
+    @Override
+    public long size() {
+      cache.cleanUp();
+      return cache.estimatedSize();
+    }
+
+    @Override
+    public void close() {
+      cache.invalidateAll();
+      cache.cleanUp();
     }
   }
 }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizerCacheHelpers.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizerCacheHelpers.java
new file mode 100644
index 0000000000..c82c0debf1
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizerCacheHelpers.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.authorization.jcasbin;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Tests for JcasbinAuthorizer static helpers and cached role snapshot 
classes. */
+public class TestJcasbinAuthorizerCacheHelpers {
+
+  // ---------- buildCacheKey ----------
+
+  @Test
+  void testBuildCacheKeyMetalake() {
+    MetadataObject obj =
+        MetadataObjects.of(Collections.singletonList("ml1"), 
MetadataObject.Type.METALAKE);
+    String key = JcasbinAuthorizer.buildCacheKey("ml1", obj);
+    // Metalake is non-leaf → trailing "::"
+    Assertions.assertEquals("ml1::ml1::", key);
+  }
+
+  @Test
+  void testBuildCacheKeyCatalog() {
+    MetadataObject obj =
+        MetadataObjects.of(Collections.singletonList("cat1"), 
MetadataObject.Type.CATALOG);
+    String key = JcasbinAuthorizer.buildCacheKey("ml1", obj);
+    Assertions.assertEquals("ml1::cat1::", key);
+  }
+
+  @Test
+  void testBuildCacheKeySchema() {
+    MetadataObject obj =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1"), 
MetadataObject.Type.SCHEMA);
+    String key = JcasbinAuthorizer.buildCacheKey("ml1", obj);
+    // Schema is non-leaf → trailing "::"
+    Assertions.assertEquals("ml1::cat1::sch1::", key);
+  }
+
+  @Test
+  void testBuildCacheKeyTable() {
+    MetadataObject obj =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "tbl1"), 
MetadataObject.Type.TABLE);
+    String key = JcasbinAuthorizer.buildCacheKey("ml1", obj);
+    // Table is a leaf → type suffix
+    Assertions.assertEquals("ml1::cat1::sch1::tbl1::TABLE", key);
+  }
+
+  @Test
+  void testBuildCacheKeyFileset() {
+    MetadataObject obj =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "fs1"), 
MetadataObject.Type.FILESET);
+    String key = JcasbinAuthorizer.buildCacheKey("ml1", obj);
+    Assertions.assertEquals("ml1::cat1::sch1::fs1::FILESET", key);
+  }
+
+  @Test
+  void testBuildCacheKeyTopic() {
+    MetadataObject obj =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "topic1"), 
MetadataObject.Type.TOPIC);
+    String key = JcasbinAuthorizer.buildCacheKey("ml1", obj);
+    Assertions.assertEquals("ml1::cat1::sch1::topic1::TOPIC", key);
+  }
+
+  @Test
+  void testBuildCacheKeyModel() {
+    MetadataObject obj =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "model1"), 
MetadataObject.Type.MODEL);
+    String key = JcasbinAuthorizer.buildCacheKey("ml1", obj);
+    Assertions.assertEquals("ml1::cat1::sch1::model1::MODEL", key);
+  }
+
+  @Test
+  void testBuildCacheKeyView() {
+    MetadataObject obj =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "v1"), 
MetadataObject.Type.VIEW);
+    String key = JcasbinAuthorizer.buildCacheKey("ml1", obj);
+    Assertions.assertEquals("ml1::cat1::sch1::v1::VIEW", key);
+  }
+
+  // ---------- isNonLeaf ----------
+
+  @Test
+  void testIsNonLeafMetalake() {
+    
Assertions.assertTrue(JcasbinAuthorizer.isNonLeaf(MetadataObject.Type.METALAKE));
+  }
+
+  @Test
+  void testIsNonLeafCatalog() {
+    
Assertions.assertTrue(JcasbinAuthorizer.isNonLeaf(MetadataObject.Type.CATALOG));
+  }
+
+  @Test
+  void testIsNonLeafSchema() {
+    
Assertions.assertTrue(JcasbinAuthorizer.isNonLeaf(MetadataObject.Type.SCHEMA));
+  }
+
+  @Test
+  void testIsLeafTable() {
+    
Assertions.assertFalse(JcasbinAuthorizer.isNonLeaf(MetadataObject.Type.TABLE));
+  }
+
+  @Test
+  void testIsLeafFileset() {
+    
Assertions.assertFalse(JcasbinAuthorizer.isNonLeaf(MetadataObject.Type.FILESET));
+  }
+
+  @Test
+  void testIsLeafTopic() {
+    
Assertions.assertFalse(JcasbinAuthorizer.isNonLeaf(MetadataObject.Type.TOPIC));
+  }
+
+  // ---------- Prefix cascade key hierarchy ----------
+
+  @Test
+  void testCascadeInvalidationKeyHierarchy() {
+    // Dropping a catalog should use a prefix that covers all schemas and 
tables below it
+    MetadataObject catalog =
+        MetadataObjects.of(Collections.singletonList("cat1"), 
MetadataObject.Type.CATALOG);
+    String catalogKey = JcasbinAuthorizer.buildCacheKey("ml1", catalog);
+
+    MetadataObject schema =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1"), 
MetadataObject.Type.SCHEMA);
+    String schemaKey = JcasbinAuthorizer.buildCacheKey("ml1", schema);
+
+    MetadataObject table =
+        MetadataObjects.of(Arrays.asList("cat1", "sch1", "tbl1"), 
MetadataObject.Type.TABLE);
+    String tableKey = JcasbinAuthorizer.buildCacheKey("ml1", table);
+
+    // Schema key starts with catalog key prefix
+    Assertions.assertTrue(
+        schemaKey.startsWith(catalogKey),
+        "schema key should start with catalog prefix for cascade 
invalidation");
+    // Table key starts with catalog key prefix
+    Assertions.assertTrue(
+        tableKey.startsWith(catalogKey),
+        "table key should start with catalog prefix for cascade invalidation");
+    // Table key starts with schema key prefix
+    Assertions.assertTrue(
+        tableKey.startsWith(schemaKey),
+        "table key should start with schema prefix for cascade invalidation");
+  }
+
+  // ---------- CachedUserRoles ----------
+
+  @Test
+  void testCachedUserRoles() {
+    List<Long> roleIds = Arrays.asList(10L, 20L, 30L);
+    CachedUserRoles cur = new CachedUserRoles(1L, 1000L, roleIds);
+    Assertions.assertEquals(1L, cur.getUserId());
+    Assertions.assertEquals(1000L, cur.getUpdatedAt());
+    Assertions.assertEquals(roleIds, cur.getRoleIds());
+  }
+
+  // ---------- CachedGroupRoles ----------
+
+  @Test
+  void testCachedGroupRoles() {
+    List<Long> roleIds = Arrays.asList(100L, 200L);
+    CachedGroupRoles cgr = new CachedGroupRoles(5L, 2000L, roleIds);
+    Assertions.assertEquals(5L, cgr.getGroupId());
+    Assertions.assertEquals(2000L, cgr.getUpdatedAt());
+    Assertions.assertEquals(roleIds, cgr.getRoleIds());
+  }
+}

Reply via email to