This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new c962017ec8 [#9535] feat(core): support FUNCTION entity for 
authorization wiring (#10855)
c962017ec8 is described below

commit c962017ec877bafed02eddcc0857eba7b28c9686
Author: mchades <[email protected]>
AuthorDate: Tue Apr 28 12:19:30 2026 +0800

    [#9535] feat(core): support FUNCTION entity for authorization wiring 
(#10855)
    
    ### What changes were proposed in this pull request?
    
    Wire the `FUNCTION` entity type into Gravitino's core authorization
    infrastructure, following the model privilege pattern.
    
    Key changes:
    - **`FunctionHookDispatcher`** (new) — sets ownership on
    `registerFunction`, delegates all other operations; inserted into the
    dispatcher chain in `GravitinoEnv`
    - **`RelationalEntityStoreIdResolver`** — add `FUNCTION` to
    `ENTITY_TYPES_REQUIRING_SCHEMA_IDS` and implement the `FUNCTION` case
    using `FunctionMetaService.getFunctionIdBySchemaIdAndFunctionName`
    - **`FunctionMetaService`** — new
    `getFunctionIdBySchemaIdAndFunctionName` method
    - **`FunctionMetaMapper` / `FunctionMetaSQLProviderFactory` /
    `FunctionMetaBaseSQLProvider`** — new
    `selectFunctionIdBySchemaIdAndFunctionName` query
    - **`MetadataObjectService`** — `getFunctionObjectsFullName` for
    ownership look-up
    - **`MetadataObjectUtil`** — `FUNCTION` in `TYPE_TO_TYPE_MAP`,
    `toEntityIdent`, `checkMetadataObject`
    - **`NameIdentifierUtil`** — `FUNCTION` in `toMetadataObject` and
    `parentEntityType`
    
    ### Why are the changes needed?
    
    Without these plumbing changes the `OwnerManager` cannot resolve a
    `FUNCTION` entity ID, causing an `Unsupported entity type: FUNCTION`
    error when a function is registered.
    
    Fix: #9535
    
    ### Does this PR introduce _any_ user-facing change?
    
    No user-facing API changes. Internal plumbing only.
    
    ### How was this patch tested?
    
    Unit tests pass: `./gradlew :core:test -PskipITs`
    
    End-to-end integration test `FunctionAuthorizationIT` (in a follow-up
    PR) covers the full ownership and privilege flow.
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../java/org/apache/gravitino/GravitinoEnv.java    |   9 +-
 .../gravitino/hook/FunctionHookDispatcher.java     | 105 ++++++
 .../RelationalEntityStoreIdResolver.java           |  13 +
 .../relational/mapper/FunctionMetaMapper.java      |  16 +
 .../mapper/FunctionMetaSQLProviderFactory.java     |  10 +
 .../provider/base/FunctionMetaBaseSQLProvider.java |  23 ++
 .../provider/base/OwnerMetaBaseSQLProvider.java    |  11 +
 .../base/SecurableObjectBaseSQLProvider.java       |  11 +
 .../postgresql/OwnerMetaPostgreSQLProvider.java    |  11 +
 .../SecurableObjectPostgreSQLProvider.java         |  11 +
 .../relational/service/FunctionMetaService.java    |  33 +-
 .../relational/service/MetadataObjectService.java  |  53 +++
 .../apache/gravitino/utils/MetadataObjectUtil.java |   7 +
 .../apache/gravitino/utils/NameIdentifierUtil.java |   6 +
 .../gravitino/hook/TestFunctionHookDispatcher.java | 120 ++++++
 .../storage/TestEntityStorageRelationCache.java    | 414 +++++++++++++++++++++
 .../storage/relational/TestJDBCBackend.java        |  31 ++
 .../service/TestFunctionMetaService.java           | 143 +++++--
 .../service/TestMetadataObjectService.java         | 132 +++++++
 .../relational/service/TestOwnerMetaService.java   |  74 +++-
 .../relational/service/TestSecurableObjects.java   |  69 +++-
 21 files changed, 1234 insertions(+), 68 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index d7c84baca3..f3b2e40ec7 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -58,6 +58,7 @@ import 
org.apache.gravitino.credential.CredentialOperationDispatcher;
 import org.apache.gravitino.hook.AccessControlHookDispatcher;
 import org.apache.gravitino.hook.CatalogHookDispatcher;
 import org.apache.gravitino.hook.FilesetHookDispatcher;
+import org.apache.gravitino.hook.FunctionHookDispatcher;
 import org.apache.gravitino.hook.JobHookDispatcher;
 import org.apache.gravitino.hook.MetalakeHookDispatcher;
 import org.apache.gravitino.hook.ModelHookDispatcher;
@@ -613,14 +614,16 @@ public class GravitinoEnv {
         new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
     this.modelDispatcher = new ModelEventDispatcher(eventBus, 
modelNormalizeDispatcher);
 
-    // TODO: Add FunctionHookDispatcher when needed
     // The operation chain is:
-    // FunctionEventDispatcher -> FunctionNormalizeDispatcher -> 
FunctionOperationDispatcher
+    // FunctionEventDispatcher -> FunctionNormalizeDispatcher -> 
FunctionHookDispatcher ->
+    // FunctionOperationDispatcher
     FunctionOperationDispatcher functionOperationDispatcher =
         new FunctionOperationDispatcher(
             catalogManager, schemaOperationDispatcher, entityStore, 
idGenerator);
+    FunctionHookDispatcher functionHookDispatcher =
+        new FunctionHookDispatcher(functionOperationDispatcher);
     FunctionNormalizeDispatcher functionNormalizeDispatcher =
-        new FunctionNormalizeDispatcher(functionOperationDispatcher, 
catalogManager);
+        new FunctionNormalizeDispatcher(functionHookDispatcher, 
catalogManager);
     this.functionDispatcher = new FunctionEventDispatcher(eventBus, 
functionNormalizeDispatcher);
 
     // TODO: Add ViewHookDispatcher and ViewEventDispatcher when needed for 
view-specific hooks
diff --git 
a/core/src/main/java/org/apache/gravitino/hook/FunctionHookDispatcher.java 
b/core/src/main/java/org/apache/gravitino/hook/FunctionHookDispatcher.java
new file mode 100644
index 0000000000..6898c45198
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/hook/FunctionHookDispatcher.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.hook;
+
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.FunctionDispatcher;
+import org.apache.gravitino.exceptions.FunctionAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionChange;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+
+/**
+ * {@code FunctionHookDispatcher} is a decorator for {@link 
FunctionDispatcher} that not only
+ * delegates function operations to the underlying function dispatcher but 
also executes some hook
+ * operations before or after the underlying operations.
+ */
+public class FunctionHookDispatcher implements FunctionDispatcher {
+
+  private final FunctionDispatcher dispatcher;
+
+  public FunctionHookDispatcher(FunctionDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public NameIdentifier[] listFunctions(Namespace namespace) throws 
NoSuchSchemaException {
+    return dispatcher.listFunctions(namespace);
+  }
+
+  @Override
+  public Function[] listFunctionInfos(Namespace namespace) throws 
NoSuchSchemaException {
+    return dispatcher.listFunctionInfos(namespace);
+  }
+
+  @Override
+  public Function getFunction(NameIdentifier ident) throws 
NoSuchFunctionException {
+    return dispatcher.getFunction(ident);
+  }
+
+  @Override
+  public boolean functionExists(NameIdentifier ident) {
+    return dispatcher.functionExists(ident);
+  }
+
+  @Override
+  public Function registerFunction(
+      NameIdentifier ident,
+      String comment,
+      FunctionType functionType,
+      boolean deterministic,
+      FunctionDefinition[] definitions)
+      throws NoSuchSchemaException, FunctionAlreadyExistsException {
+    Function function =
+        dispatcher.registerFunction(ident, comment, functionType, 
deterministic, definitions);
+
+    // Set the creator as owner of the function.
+    OwnerDispatcher ownerManager = 
GravitinoEnv.getInstance().ownerDispatcher();
+    if (ownerManager != null) {
+      ownerManager.setOwner(
+          ident.namespace().level(0),
+          NameIdentifierUtil.toMetadataObject(ident, 
Entity.EntityType.FUNCTION),
+          PrincipalUtils.getCurrentUserName(),
+          Owner.Type.USER);
+    }
+    return function;
+  }
+
+  @Override
+  public Function alterFunction(NameIdentifier ident, FunctionChange... 
changes)
+      throws NoSuchFunctionException, IllegalArgumentException {
+    return dispatcher.alterFunction(ident, changes);
+  }
+
+  @Override
+  public boolean dropFunction(NameIdentifier ident) {
+    return dispatcher.dropFunction(ident);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
index 74f1b325fe..895572719a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStoreIdResolver.java
@@ -29,6 +29,7 @@ import 
org.apache.gravitino.storage.relational.helper.CatalogIds;
 import org.apache.gravitino.storage.relational.helper.SchemaIds;
 import org.apache.gravitino.storage.relational.service.CatalogMetaService;
 import org.apache.gravitino.storage.relational.service.FilesetMetaService;
+import org.apache.gravitino.storage.relational.service.FunctionMetaService;
 import org.apache.gravitino.storage.relational.service.GroupMetaService;
 import org.apache.gravitino.storage.relational.service.JobTemplateMetaService;
 import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
@@ -67,6 +68,7 @@ public class RelationalEntityStoreIdResolver implements 
EntityIdResolver {
           Entity.EntityType.TOPIC,
           Entity.EntityType.MODEL,
           Entity.EntityType.COLUMN,
+          Entity.EntityType.FUNCTION,
           Entity.EntityType.VIEW);
 
   @Override
@@ -233,6 +235,17 @@ public class RelationalEntityStoreIdResolver implements 
EntityIdResolver {
         return new NamespacedEntityId(
             viewId, schemaIds.getMetalakeId(), schemaIds.getCatalogId(), 
schemaIds.getSchemaId());
 
+      case FUNCTION:
+        long functionId =
+            FunctionMetaService.getInstance()
+                .getFunctionIdBySchemaIdAndFunctionName(
+                    schemaIds.getSchemaId(), nameIdentifier.name());
+        return new NamespacedEntityId(
+            functionId,
+            schemaIds.getMetalakeId(),
+            schemaIds.getCatalogId(),
+            schemaIds.getSchemaId());
+
       default:
         throw new IllegalArgumentException("Unsupported entity type: " + type);
     }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
index ad50f07799..4c307d5afa 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaMapper.java
@@ -85,6 +85,16 @@ public interface FunctionMetaMapper {
   @SelectProvider(type = FunctionMetaSQLProviderFactory.class, method = 
"listFunctionPOsBySchemaId")
   List<FunctionPO> listFunctionPOsBySchemaId(@Param("schemaId") Long schemaId);
 
+  @Results({
+    @Result(property = "functionId", column = "function_id", id = true),
+    @Result(property = "functionName", column = "function_name"),
+    @Result(property = "schemaId", column = "schema_id"),
+  })
+  @SelectProvider(
+      type = FunctionMetaSQLProviderFactory.class,
+      method = "listFunctionPOsByFunctionIds")
+  List<FunctionPO> listFunctionPOsByFunctionIds(@Param("functionIds") 
List<Long> functionIds);
+
   @Results(
       id = "functionPOResultMap",
       value = {
@@ -132,6 +142,12 @@ public interface FunctionMetaMapper {
   FunctionPO selectFunctionMetaBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("functionName") String 
functionName);
 
+  @SelectProvider(
+      type = FunctionMetaSQLProviderFactory.class,
+      method = "selectFunctionIdBySchemaIdAndFunctionName")
+  Long selectFunctionIdBySchemaIdAndFunctionName(
+      @Param("schemaId") Long schemaId, @Param("functionName") String 
functionName);
+
   @UpdateProvider(
       type = FunctionMetaSQLProviderFactory.class,
       method = "softDeleteFunctionMetaByFunctionId")
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaSQLProviderFactory.java
index 239a19c044..1c2110b25b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaSQLProviderFactory.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FunctionMetaSQLProviderFactory.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.storage.relational.mapper;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
 import 
org.apache.gravitino.storage.relational.mapper.provider.base.FunctionMetaBaseSQLProvider;
@@ -64,6 +65,10 @@ public class FunctionMetaSQLProviderFactory {
     return getProvider().listFunctionPOsBySchemaId(schemaId);
   }
 
+  public static String listFunctionPOsByFunctionIds(@Param("functionIds") 
List<Long> functionIds) {
+    return getProvider().listFunctionPOsByFunctionIds(functionIds);
+  }
+
   public static String listFunctionPOsByFullQualifiedName(
       @Param("metalakeName") String metalakeName,
       @Param("catalogName") String catalogName,
@@ -85,6 +90,11 @@ public class FunctionMetaSQLProviderFactory {
     return getProvider().selectFunctionMetaBySchemaIdAndName(schemaId, 
functionName);
   }
 
+  public static String selectFunctionIdBySchemaIdAndFunctionName(
+      @Param("schemaId") Long schemaId, @Param("functionName") String 
functionName) {
+    return getProvider().selectFunctionIdBySchemaIdAndFunctionName(schemaId, 
functionName);
+  }
+
   public static String softDeleteFunctionMetaByFunctionId(@Param("functionId") 
Long functionId) {
     return getProvider().softDeleteFunctionMetaByFunctionId(functionId);
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionMetaBaseSQLProvider.java
index ecd11a5232..88d229bcf7 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/FunctionMetaBaseSQLProvider.java
@@ -21,6 +21,7 @@ package 
org.apache.gravitino.storage.relational.mapper.provider.base;
 import static 
org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper.TABLE_NAME;
 import static 
org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper.VERSION_TABLE_NAME;
 
+import java.util.List;
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
@@ -200,6 +201,20 @@ public class FunctionMetaBaseSQLProvider {
         + " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND 
vi.deleted_at = 0";
   }
 
+  public String listFunctionPOsByFunctionIds(@Param("functionIds") List<Long> 
functionIds) {
+    return "<script>"
+        + " SELECT function_id, function_name, schema_id"
+        + " FROM "
+        + TABLE_NAME
+        + " WHERE deleted_at = 0"
+        + " AND function_id IN ("
+        + "<foreach collection='functionIds' item='functionId' separator=','>"
+        + "#{functionId}"
+        + "</foreach>"
+        + ") "
+        + "</script>";
+  }
+
   public String selectFunctionMetaBySchemaIdAndName(
       @Param("schemaId") Long schemaId, @Param("functionName") String 
functionName) {
     return "SELECT fm.function_id, fm.function_name, fm.metalake_id, 
fm.catalog_id, fm.schema_id,"
@@ -219,6 +234,14 @@ public class FunctionMetaBaseSQLProvider {
         + " AND fm.deleted_at = 0 AND vi.deleted_at = 0";
   }
 
+  public String selectFunctionIdBySchemaIdAndFunctionName(
+      @Param("schemaId") Long schemaId, @Param("functionName") String 
functionName) {
+    return "SELECT function_id"
+        + " FROM "
+        + TABLE_NAME
+        + " WHERE schema_id = #{schemaId} AND function_name = #{functionName} 
AND deleted_at = 0";
+  }
+
   public String softDeleteFunctionMetaByFunctionId(@Param("functionId") Long 
functionId) {
     return "UPDATE "
         + TABLE_NAME
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
index a7ce2b8247..478aa8f618 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/OwnerMetaBaseSQLProvider.java
@@ -23,6 +23,7 @@ import static 
org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper.OWN
 import java.util.List;
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
@@ -189,6 +190,11 @@ public class OwnerMetaBaseSQLProvider {
         + ViewMetaMapper.TABLE_NAME
         + " vt WHERE vt.catalog_id = #{catalogId} AND"
         + " vt.view_id = ot.metadata_object_id AND ot.metadata_object_type = 
'VIEW'"
+        + " UNION"
+        + " SELECT fnt.catalog_id FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " fnt WHERE fnt.catalog_id = #{catalogId} AND"
+        + " fnt.function_id = ot.metadata_object_id AND 
ot.metadata_object_type = 'FUNCTION'"
         + ")";
   }
 
@@ -227,6 +233,11 @@ public class OwnerMetaBaseSQLProvider {
         + ViewMetaMapper.TABLE_NAME
         + " vt WHERE vt.schema_id = #{schemaId} AND"
         + " vt.view_id = ot.metadata_object_id AND ot.metadata_object_type = 
'VIEW'"
+        + " UNION"
+        + " SELECT fnt.schema_id FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " fnt WHERE fnt.schema_id = #{schemaId} AND"
+        + " fnt.function_id = ot.metadata_object_id AND 
ot.metadata_object_type = 'FUNCTION'"
         + ")";
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SecurableObjectBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SecurableObjectBaseSQLProvider.java
index 69e9d9e2e3..a33346e727 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SecurableObjectBaseSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/SecurableObjectBaseSQLProvider.java
@@ -24,6 +24,7 @@ import static 
org.apache.gravitino.storage.relational.mapper.SecurableObjectMapp
 import java.util.List;
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
@@ -140,6 +141,11 @@ public class SecurableObjectBaseSQLProvider {
         + ViewMetaMapper.TABLE_NAME
         + " vt WHERE vt.catalog_id = #{catalogId} AND"
         + " vt.view_id = sect.metadata_object_id AND sect.type = 'VIEW'"
+        + " UNION"
+        + " SELECT fnt.catalog_id FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " fnt WHERE fnt.catalog_id = #{catalogId} AND"
+        + " fnt.function_id = sect.metadata_object_id AND sect.type = 
'FUNCTION'"
         + ")";
   }
 
@@ -178,6 +184,11 @@ public class SecurableObjectBaseSQLProvider {
         + ViewMetaMapper.TABLE_NAME
         + " vt WHERE vt.schema_id = #{schemaId} AND"
         + " vt.view_id = sect.metadata_object_id AND sect.type = 'VIEW'"
+        + " UNION"
+        + " SELECT fnt.schema_id FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " fnt WHERE fnt.schema_id = #{schemaId} AND"
+        + " fnt.function_id = sect.metadata_object_id AND sect.type = 
'FUNCTION'"
         + ")";
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/OwnerMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/OwnerMetaPostgreSQLProvider.java
index 4012bf52d5..d0fe369261 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/OwnerMetaPostgreSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/OwnerMetaPostgreSQLProvider.java
@@ -22,6 +22,7 @@ import static 
org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper.OWN
 
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
@@ -96,6 +97,11 @@ public class OwnerMetaPostgreSQLProvider extends 
OwnerMetaBaseSQLProvider {
         + ViewMetaMapper.TABLE_NAME
         + " vt WHERE vt.catalog_id = #{catalogId} AND"
         + " vt.view_id = ot.metadata_object_id AND ot.metadata_object_type = 
'VIEW'"
+        + " UNION"
+        + " SELECT fnt.catalog_id FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " fnt WHERE fnt.catalog_id = #{catalogId} AND"
+        + " fnt.function_id = ot.metadata_object_id AND 
ot.metadata_object_type = 'FUNCTION'"
         + ")";
   }
 
@@ -134,6 +140,11 @@ public class OwnerMetaPostgreSQLProvider extends 
OwnerMetaBaseSQLProvider {
         + ViewMetaMapper.TABLE_NAME
         + " vt WHERE vt.schema_id = #{schemaId} AND"
         + " vt.view_id = ot.metadata_object_id AND ot.metadata_object_type = 
'VIEW'"
+        + " UNION"
+        + " SELECT fnt.schema_id FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " fnt WHERE fnt.schema_id = #{schemaId} AND"
+        + " fnt.function_id = ot.metadata_object_id AND 
ot.metadata_object_type = 'FUNCTION'"
         + ")";
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
index 9d07936e8b..ba25da2dc0 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
@@ -24,6 +24,7 @@ import static 
org.apache.gravitino.storage.relational.mapper.SecurableObjectMapp
 import java.util.List;
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
@@ -118,6 +119,11 @@ public class SecurableObjectPostgreSQLProvider extends 
SecurableObjectBaseSQLPro
         + ViewMetaMapper.TABLE_NAME
         + " vt WHERE vt.catalog_id = #{catalogId} AND"
         + " vt.view_id = sect.metadata_object_id AND sect.type = 'VIEW'"
+        + " UNION"
+        + " SELECT fnt.catalog_id FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " fnt WHERE fnt.catalog_id = #{catalogId} AND"
+        + " fnt.function_id = sect.metadata_object_id AND sect.type = 
'FUNCTION'"
         + ")";
   }
 
@@ -156,6 +162,11 @@ public class SecurableObjectPostgreSQLProvider extends 
SecurableObjectBaseSQLPro
         + ViewMetaMapper.TABLE_NAME
         + " vt WHERE vt.schema_id = #{schemaId} AND"
         + " vt.view_id = sect.metadata_object_id AND sect.type = 'VIEW'"
+        + " UNION"
+        + " SELECT fnt.schema_id FROM "
+        + FunctionMetaMapper.TABLE_NAME
+        + " fnt WHERE fnt.schema_id = #{schemaId} AND"
+        + " fnt.function_id = sect.metadata_object_id AND sect.type = 
'FUNCTION'"
         + ")";
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
index c6ca471079..0377afed00 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
@@ -43,6 +44,8 @@ import org.apache.gravitino.meta.NamespacedEntityId;
 import org.apache.gravitino.metrics.Monitored;
 import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.FunctionVersionMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
 import org.apache.gravitino.storage.relational.po.FunctionMaxVersionPO;
 import org.apache.gravitino.storage.relational.po.FunctionPO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
@@ -80,6 +83,24 @@ public class FunctionMetaService {
     return fromFunctionPO(functionPO, ident.namespace());
   }
 
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "getFunctionIdBySchemaIdAndFunctionName")
+  public Long getFunctionIdBySchemaIdAndFunctionName(Long schemaId, String 
functionName) {
+    Long functionId =
+        SessionUtils.getWithoutCommit(
+            FunctionMetaMapper.class,
+            mapper -> 
mapper.selectFunctionIdBySchemaIdAndFunctionName(schemaId, functionName));
+
+    if (functionId == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
+          functionName);
+    }
+    return functionId;
+  }
+
   @Monitored(
       metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
       baseMetricName = "insertFunction")
@@ -135,12 +156,22 @@ public class FunctionMetaService {
                     FunctionMetaMapper.class,
                     mapper -> 
mapper.softDeleteFunctionMetaByFunctionId(functionId))),
 
-        // delete function versions after meta deletion
+        // delete function versions, owner rels, and securable object rels 
after meta deletion
         () -> {
           if (functionDeletedCount.get() > 0) {
             SessionUtils.doWithoutCommit(
                 FunctionVersionMetaMapper.class,
                 mapper -> 
mapper.softDeleteFunctionVersionsByFunctionId(functionId));
+            SessionUtils.doWithoutCommit(
+                OwnerMetaMapper.class,
+                mapper ->
+                    mapper.softDeleteOwnerRelByMetadataObjectIdAndType(
+                        functionId, MetadataObject.Type.FUNCTION.name()));
+            SessionUtils.doWithoutCommit(
+                SecurableObjectMapper.class,
+                mapper ->
+                    mapper.softDeleteObjectRelsByMetadataObject(
+                        functionId, MetadataObject.Type.FUNCTION.name()));
           }
         });
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index efb7c7e8d3..ee8f7ff8ad 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -37,6 +37,7 @@ import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.metrics.Monitored;
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
@@ -50,6 +51,7 @@ import 
org.apache.gravitino.storage.relational.mapper.ViewMetaMapper;
 import org.apache.gravitino.storage.relational.po.CatalogPO;
 import org.apache.gravitino.storage.relational.po.ColumnPO;
 import org.apache.gravitino.storage.relational.po.FilesetPO;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
 import org.apache.gravitino.storage.relational.po.JobTemplatePO;
 import org.apache.gravitino.storage.relational.po.MetalakePO;
 import org.apache.gravitino.storage.relational.po.ModelPO;
@@ -82,6 +84,7 @@ public class MetadataObjectService {
               .put(MetadataObject.Type.TABLE, 
MetadataObjectService::getTableObjectsFullName)
               .put(MetadataObject.Type.FILESET, 
MetadataObjectService::getFilesetObjectsFullName)
               .put(MetadataObject.Type.MODEL, 
MetadataObjectService::getModelObjectsFullName)
+              .put(MetadataObject.Type.FUNCTION, 
MetadataObjectService::getFunctionObjectsFullName)
               .put(MetadataObject.Type.TOPIC, 
MetadataObjectService::getTopicObjectsFullName)
               .put(MetadataObject.Type.VIEW, 
MetadataObjectService::getViewObjectsFullName)
               .put(MetadataObject.Type.COLUMN, 
MetadataObjectService::getColumnObjectsFullName)
@@ -325,6 +328,56 @@ public class MetadataObjectService {
     return modelIdAndNameMap;
   }
 
+  /**
+   * Retrieves a map of Function object IDs to their full names.
+   *
+   * @param functionIds A list of Function object IDs to fetch names for.
+   * @return A Map where the key is the Function ID and the value is the 
Function full name. The map
+   *     may contain null values for the names if its parent object is 
deleted. Returns an empty map
+   *     if no Function objects are found for the given IDs. {@code @example} 
value of function full
+   *     name: "catalog1.schema1.function1"
+   */
+  @Monitored(
+      metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
+      baseMetricName = "getFunctionObjectsFullName")
+  public static Map<Long, String> getFunctionObjectsFullName(List<Long> 
functionIds) {
+    if (functionIds == null || functionIds.isEmpty()) {
+      return Maps.newHashMap();
+    }
+
+    List<FunctionPO> functionPOs =
+        SessionUtils.getWithoutCommit(
+            FunctionMetaMapper.class, mapper -> 
mapper.listFunctionPOsByFunctionIds(functionIds));
+
+    if (functionPOs == null || functionPOs.isEmpty()) {
+      return new HashMap<>();
+    }
+
+    List<Long> schemaIds =
+        
functionPOs.stream().map(FunctionPO::schemaId).collect(Collectors.toList());
+
+    Map<Long, String> schemaIdAndNameMap = getSchemaObjectsFullName(schemaIds);
+
+    HashMap<Long, String> functionIdAndNameMap = new HashMap<>();
+
+    functionPOs.forEach(
+        functionPO -> {
+          // since the schema can be deleted, we need to check the null value,
+          // and when schema is deleted, we will set fullName of functionPO to 
null.
+          String schemaName = 
schemaIdAndNameMap.getOrDefault(functionPO.schemaId(), null);
+          if (schemaName == null) {
+            LOG.warn("The schema of function {} may be deleted", 
functionPO.functionId());
+            functionIdAndNameMap.put(functionPO.functionId(), null);
+            return;
+          }
+
+          String fullName = DOT_JOINER.join(schemaName, 
functionPO.functionName());
+          functionIdAndNameMap.put(functionPO.functionId(), fullName);
+        });
+
+    return functionIdAndNameMap;
+  }
+
   /**
    * Retrieves a map of Table object IDs to their full names.
    *
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
index 299099b2cc..42128cf287 100644
--- a/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/MetadataObjectUtil.java
@@ -60,6 +60,7 @@ public class MetadataObjectUtil {
           .put(MetadataObject.Type.JOB_TEMPLATE, 
Entity.EntityType.JOB_TEMPLATE)
           .put(MetadataObject.Type.JOB, Entity.EntityType.JOB)
           .put(MetadataObject.Type.VIEW, Entity.EntityType.VIEW)
+          .put(MetadataObject.Type.FUNCTION, Entity.EntityType.FUNCTION)
           .build();
 
   private MetadataObjectUtil() {}
@@ -129,6 +130,7 @@ public class MetadataObjectUtil {
       case FILESET:
       case COLUMN:
       case MODEL:
+      case FUNCTION:
         String fullName = DOT.join(metalakeName, metadataObject.fullName());
         return NameIdentifier.parse(fullName);
       default:
@@ -201,6 +203,11 @@ public class MetadataObjectUtil {
         check(env.modelDispatcher().modelExists(identifier), 
exceptionToThrowSupplier);
         break;
 
+      case FUNCTION:
+        NameIdentifierUtil.checkFunction(identifier);
+        check(env.functionDispatcher().functionExists(identifier), 
exceptionToThrowSupplier);
+        break;
+
       case VIEW:
         NameIdentifierUtil.checkView(identifier);
         check(env.viewDispatcher().viewExists(identifier), 
exceptionToThrowSupplier);
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index 75f6718f6f..b7bc9b742d 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -653,6 +653,11 @@ public class NameIdentifierUtil {
         String modelParent = dot.join(ident.namespace().level(1), 
ident.namespace().level(2));
         return MetadataObjects.of(modelParent, ident.name(), 
MetadataObject.Type.MODEL);
 
+      case FUNCTION:
+        checkFunction(ident);
+        String functionParent = dot.join(ident.namespace().level(1), 
ident.namespace().level(2));
+        return MetadataObjects.of(functionParent, ident.name(), 
MetadataObject.Type.FUNCTION);
+
       case ROLE:
         AuthorizationUtils.checkRole(ident);
         return MetadataObjects.of(null, ident.name(), 
MetadataObject.Type.ROLE);
@@ -895,6 +900,7 @@ public class NameIdentifierUtil {
       case FILESET:
       case MODEL:
       case TOPIC:
+      case FUNCTION:
         return Entity.EntityType.SCHEMA;
 
       case SCHEMA:
diff --git 
a/core/src/test/java/org/apache/gravitino/hook/TestFunctionHookDispatcher.java 
b/core/src/test/java/org/apache/gravitino/hook/TestFunctionHookDispatcher.java
new file mode 100644
index 0000000000..f567d3764f
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/hook/TestFunctionHookDispatcher.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.hook;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.authorization.Owner;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.FunctionDispatcher;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+public class TestFunctionHookDispatcher {
+
+  @Test
+  public void testRegisterFunctionSetOwnerAfterRegister() throws Exception {
+    GravitinoEnv gravitinoEnv = GravitinoEnv.getInstance();
+    Object originalOwnerDispatcher = FieldUtils.readField(gravitinoEnv, 
"ownerDispatcher", true);
+
+    NameIdentifier functionIdentifier =
+        NameIdentifier.of("metalake1", "catalog1", "schema1", "func1");
+    FunctionDefinition[] definitions = new FunctionDefinition[] {};
+    FunctionDispatcher dispatcher = Mockito.mock(FunctionDispatcher.class);
+    Function registeredFunction = Mockito.mock(Function.class);
+    OwnerDispatcher ownerDispatcher = Mockito.mock(OwnerDispatcher.class);
+
+    Mockito.when(
+            dispatcher.registerFunction(
+                Mockito.eq(functionIdentifier),
+                Mockito.eq("comment"),
+                Mockito.eq(FunctionType.SCALAR),
+                Mockito.eq(true),
+                Mockito.eq(definitions)))
+        .thenReturn(registeredFunction);
+
+    FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", ownerDispatcher, 
true);
+    try {
+      FunctionHookDispatcher hookDispatcher = new 
FunctionHookDispatcher(dispatcher);
+      Function result =
+          hookDispatcher.registerFunction(
+              functionIdentifier, "comment", FunctionType.SCALAR, true, 
definitions);
+
+      assertSame(registeredFunction, result);
+
+      ArgumentCaptor<MetadataObject> metadataObjectCaptor =
+          ArgumentCaptor.forClass(MetadataObject.class);
+      Mockito.verify(ownerDispatcher)
+          .setOwner(
+              Mockito.eq("metalake1"),
+              metadataObjectCaptor.capture(),
+              Mockito.eq(AuthConstants.ANONYMOUS_USER),
+              Mockito.eq(Owner.Type.USER));
+      assertEquals(MetadataObject.Type.FUNCTION, 
metadataObjectCaptor.getValue().type());
+      assertEquals("catalog1.schema1.func1", 
metadataObjectCaptor.getValue().fullName());
+    } finally {
+      FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", 
originalOwnerDispatcher, true);
+    }
+  }
+
+  @Test
+  public void testRegisterFunctionSucceedsWhenOwnerDispatcherIsDisabled() 
throws Exception {
+    GravitinoEnv gravitinoEnv = GravitinoEnv.getInstance();
+    Object originalOwnerDispatcher = FieldUtils.readField(gravitinoEnv, 
"ownerDispatcher", true);
+
+    NameIdentifier functionIdentifier =
+        NameIdentifier.of("metalake1", "catalog1", "schema1", "func1");
+    FunctionDefinition[] definitions = new FunctionDefinition[] {};
+    FunctionDispatcher dispatcher = Mockito.mock(FunctionDispatcher.class);
+    Function registeredFunction = Mockito.mock(Function.class);
+
+    Mockito.when(
+            dispatcher.registerFunction(
+                Mockito.eq(functionIdentifier),
+                Mockito.eq("comment"),
+                Mockito.eq(FunctionType.SCALAR),
+                Mockito.eq(true),
+                Mockito.eq(definitions)))
+        .thenReturn(registeredFunction);
+
+    FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", null, true);
+    try {
+      FunctionHookDispatcher hookDispatcher = new 
FunctionHookDispatcher(dispatcher);
+      Function result =
+          hookDispatcher.registerFunction(
+              functionIdentifier, "comment", FunctionType.SCALAR, true, 
definitions);
+
+      assertSame(registeredFunction, result);
+      Mockito.verify(dispatcher)
+          .registerFunction(functionIdentifier, "comment", 
FunctionType.SCALAR, true, definitions);
+    } finally {
+      FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", 
originalOwnerDispatcher, true);
+    }
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorageRelationCache.java
 
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorageRelationCache.java
index 3c3bb95045..b5f2960d10 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorageRelationCache.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorageRelationCache.java
@@ -45,10 +45,18 @@ import org.apache.gravitino.cache.CaffeineEntityCache;
 import org.apache.gravitino.cache.EntityCacheKey;
 import org.apache.gravitino.cache.EntityCacheRelationKey;
 import org.apache.gravitino.cache.ReverseIndexCache;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.FunctionEntity;
 import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.meta.RoleEntity;
@@ -57,6 +65,7 @@ import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.policy.Policy;
 import org.apache.gravitino.policy.PolicyContents;
+import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.storage.relational.RelationalEntityStore;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
@@ -774,6 +783,130 @@ public class TestEntityStorageRelationCache extends 
AbstractEntityStorageTest {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("storageProvider")
+  void testFunctionTagRelationCacheInvalidation(String type, boolean 
enableCache) throws Exception {
+    Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
+    init(type, config);
+
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+    try (EntityStore store = EntityStoreFactory.createEntityStore(config)) {
+      store.initialize(config);
+
+      BaseMetalake metalake =
+          createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake", 
auditInfo);
+      store.put(metalake, false);
+
+      CatalogEntity catalog =
+          createCatalog(
+              RandomIdGenerator.INSTANCE.nextId(),
+              NamespaceUtil.ofCatalog("metalake"),
+              "catalog",
+              auditInfo);
+      store.put(catalog, false);
+
+      SchemaEntity schema =
+          createSchemaEntity(
+              RandomIdGenerator.INSTANCE.nextId(),
+              Namespace.of("metalake", "catalog"),
+              "schema",
+              auditInfo);
+      store.put(schema, false);
+
+      TagEntity tag =
+          TagEntity.builder()
+              .withId(RandomIdGenerator.INSTANCE.nextId())
+              .withNamespace(NameIdentifierUtil.ofTag("metalake", 
"tag1").namespace())
+              .withName("tag1")
+              .withAuditInfo(auditInfo)
+              .withProperties(Collections.emptyMap())
+              .build();
+      store.put(tag, false);
+
+      FunctionEntity function =
+          createFunctionEntity(
+              RandomIdGenerator.INSTANCE.nextId(),
+              Namespace.of("metalake", "catalog", "schema"),
+              "function_old",
+              auditInfo);
+      store.put(function, false);
+
+      SupportsRelationOperations relationOperations = 
(SupportsRelationOperations) store;
+      relationOperations.updateEntityRelations(
+          SupportsRelationOperations.Type.TAG_METADATA_OBJECT_REL,
+          function.nameIdentifier(),
+          Entity.EntityType.FUNCTION,
+          new NameIdentifier[] {tag.nameIdentifier()},
+          new NameIdentifier[] {});
+
+      List<TagEntity> tags =
+          relationOperations.listEntitiesByRelation(
+              SupportsRelationOperations.Type.TAG_METADATA_OBJECT_REL,
+              function.nameIdentifier(),
+              Entity.EntityType.FUNCTION,
+              true);
+      Assertions.assertEquals(1, tags.size());
+      Assertions.assertEquals(tag.name(), tags.get(0).name());
+
+      List<GenericEntity> taggedObjects =
+          relationOperations.listEntitiesByRelation(
+              SupportsRelationOperations.Type.TAG_METADATA_OBJECT_REL,
+              tag.nameIdentifier(),
+              Entity.EntityType.TAG,
+              true);
+      Assertions.assertEquals(1, taggedObjects.size());
+      Assertions.assertEquals(function.id(), taggedObjects.get(0).id());
+      Assertions.assertEquals("catalog.schema.function_old", 
taggedObjects.get(0).name());
+
+      FunctionEntity renamedFunction =
+          createFunctionEntity(
+              function.id(),
+              Namespace.of("metalake", "catalog", "schema"),
+              "function_new",
+              auditInfo);
+      store.update(
+          function.nameIdentifier(),
+          FunctionEntity.class,
+          Entity.EntityType.FUNCTION,
+          e -> renamedFunction);
+
+      if (enableCache && store instanceof RelationalEntityStore) {
+        RelationalEntityStore relationalEntityStore = (RelationalEntityStore) 
store;
+        if (relationalEntityStore.getCache() instanceof CaffeineEntityCache) {
+          CaffeineEntityCache cache = (CaffeineEntityCache) 
relationalEntityStore.getCache();
+          ReverseIndexCache reverseIndexCache = cache.getReverseIndex();
+          Assertions.assertNull(
+              reverseIndexCache.get(function.nameIdentifier(), 
Entity.EntityType.FUNCTION));
+        }
+      }
+
+      List<TagEntity> tagsAfterRename =
+          relationOperations.listEntitiesByRelation(
+              SupportsRelationOperations.Type.TAG_METADATA_OBJECT_REL,
+              renamedFunction.nameIdentifier(),
+              Entity.EntityType.FUNCTION,
+              true);
+      Assertions.assertEquals(1, tagsAfterRename.size());
+      Assertions.assertEquals(tag.name(), tagsAfterRename.get(0).name());
+
+      List<GenericEntity> taggedObjectsAfterRename =
+          relationOperations.listEntitiesByRelation(
+              SupportsRelationOperations.Type.TAG_METADATA_OBJECT_REL,
+              tag.nameIdentifier(),
+              Entity.EntityType.TAG,
+              true);
+      Assertions.assertEquals(1, taggedObjectsAfterRename.size());
+      Assertions.assertEquals(renamedFunction.id(), 
taggedObjectsAfterRename.get(0).id());
+      Assertions.assertEquals(
+          "catalog.schema.function_new", 
taggedObjectsAfterRename.get(0).name());
+
+      destroy(type);
+    }
+  }
+
   @ParameterizedTest
   @MethodSource("storageProvider")
   void testViewNotIndexedInReverseCache(String type, boolean enableCache) 
throws Exception {
@@ -1068,4 +1201,285 @@ public class TestEntityStorageRelationCache extends 
AbstractEntityStorageTest {
       destroy(type);
     }
   }
+
+  @ParameterizedTest
+  @MethodSource("storageProvider")
+  void testRoleCacheEvictedOnFunctionDeletion(String type, boolean 
enableCache) throws Exception {
+    Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
+    init(type, config);
+
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+    try (EntityStore store = EntityStoreFactory.createEntityStore(config)) {
+      try {
+        store.initialize(config);
+
+        BaseMetalake metalake =
+            createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), 
"metalake", auditInfo);
+        store.put(metalake, false);
+
+        CatalogEntity catalog =
+            createCatalog(
+                RandomIdGenerator.INSTANCE.nextId(),
+                NamespaceUtil.ofCatalog("metalake"),
+                "catalog",
+                auditInfo);
+        store.put(catalog, false);
+
+        SchemaEntity schema =
+            createSchemaEntity(
+                RandomIdGenerator.INSTANCE.nextId(),
+                Namespace.of("metalake", "catalog"),
+                "schema",
+                auditInfo);
+        store.put(schema, false);
+
+        FunctionEntity function =
+            createFunctionEntity(
+                RandomIdGenerator.INSTANCE.nextId(),
+                Namespace.of("metalake", "catalog", "schema"),
+                "function1",
+                auditInfo);
+        store.put(function, false);
+
+        SecurableObject catalogObject =
+            SecurableObjects.ofCatalog(
+                "catalog", Lists.newArrayList(Privileges.UseCatalog.allow()));
+        SecurableObject schemaObject =
+            SecurableObjects.ofSchema(
+                catalogObject, "schema", 
Lists.newArrayList(Privileges.UseSchema.allow()));
+        SecurableObject functionObject =
+            SecurableObjects.ofFunction(
+                schemaObject, "function1", 
Lists.newArrayList(Privileges.ExecuteFunction.allow()));
+
+        RoleEntity role =
+            RoleEntity.builder()
+                .withId(RandomIdGenerator.INSTANCE.nextId())
+                .withName("functionRole")
+                .withNamespace(AuthorizationUtils.ofRoleNamespace("metalake"))
+                .withProperties(null)
+                .withAuditInfo(auditInfo)
+                .withSecurableObjects(
+                    Lists.newArrayList(catalogObject, schemaObject, 
functionObject))
+                .build();
+        store.put(role, false);
+
+        // Read role to populate cache; ROLE_SECURABLE_OBJECT_REVERSE_RULE 
writes:
+        // funcIdent:FUNCTION -> [roleIdent:ROLE] into the reverse index.
+        RoleEntity cachedRole =
+            store.get(role.nameIdentifier(), Entity.EntityType.ROLE, 
RoleEntity.class);
+        Assertions.assertEquals(3, cachedRole.securableObjects().size());
+
+        if (enableCache && store instanceof RelationalEntityStore) {
+          RelationalEntityStore relationalEntityStore = 
(RelationalEntityStore) store;
+          if (relationalEntityStore.getCache() instanceof CaffeineEntityCache) 
{
+            ReverseIndexCache reverseIndex =
+                ((CaffeineEntityCache) 
relationalEntityStore.getCache()).getReverseIndex();
+            List<EntityCacheKey> reverseKeys =
+                reverseIndex.get(function.nameIdentifier(), 
Entity.EntityType.FUNCTION);
+            Assertions.assertNotNull(reverseKeys);
+            Assertions.assertTrue(
+                reverseKeys.stream().anyMatch(k -> 
k.identifier().equals(role.nameIdentifier())));
+          }
+        }
+
+        // Delete the function; BFS invalidation should evict the role from 
cache via reverse index.
+        store.delete(function.nameIdentifier(), Entity.EntityType.FUNCTION, 
false);
+
+        if (enableCache && store instanceof RelationalEntityStore) {
+          RelationalEntityStore relationalEntityStore = 
(RelationalEntityStore) store;
+          if (relationalEntityStore.getCache() instanceof CaffeineEntityCache) 
{
+            CaffeineEntityCache cache = (CaffeineEntityCache) 
relationalEntityStore.getCache();
+            Assertions.assertNull(
+                cache
+                    .getCacheData()
+                    .getIfPresent(
+                        EntityCacheRelationKey.of(role.nameIdentifier(), 
Entity.EntityType.ROLE)));
+          }
+        }
+
+        // Re-read role; the function securable object should have been 
removed from DB via cascade.
+        RoleEntity reloadedRole =
+            store.get(role.nameIdentifier(), Entity.EntityType.ROLE, 
RoleEntity.class);
+        long functionSecurableObjects =
+            reloadedRole.securableObjects().stream()
+                .filter(so -> so.type() == MetadataObject.Type.FUNCTION)
+                .count();
+        Assertions.assertEquals(0, functionSecurableObjects);
+
+      } finally {
+        destroy(type);
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("storageProvider")
+  void testFunctionOwnerRelationCacheInvalidation(String type, boolean 
enableCache)
+      throws Exception {
+    Config config = Mockito.mock(Config.class);
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(enableCache);
+    init(type, config);
+
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+    try (EntityStore store = EntityStoreFactory.createEntityStore(config)) {
+      try {
+        store.initialize(config);
+
+        BaseMetalake metalake =
+            createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), 
"metalake", auditInfo);
+        store.put(metalake, false);
+
+        CatalogEntity catalog =
+            createCatalog(
+                RandomIdGenerator.INSTANCE.nextId(),
+                NamespaceUtil.ofCatalog("metalake"),
+                "catalog",
+                auditInfo);
+        store.put(catalog, false);
+
+        SchemaEntity schema =
+            createSchemaEntity(
+                RandomIdGenerator.INSTANCE.nextId(),
+                Namespace.of("metalake", "catalog"),
+                "schema",
+                auditInfo);
+        store.put(schema, false);
+
+        FunctionEntity function =
+            createFunctionEntity(
+                RandomIdGenerator.INSTANCE.nextId(),
+                Namespace.of("metalake", "catalog", "schema"),
+                "function1",
+                auditInfo);
+        store.put(function, false);
+
+        SupportsRelationOperations relationOperations = 
(SupportsRelationOperations) store;
+
+        // 1. Query owner of function - should be empty initially and result 
is cached
+        List<UserEntity> owners =
+            relationOperations.listEntitiesByRelation(
+                SupportsRelationOperations.Type.OWNER_REL,
+                function.nameIdentifier(),
+                Entity.EntityType.FUNCTION,
+                true);
+        Assertions.assertTrue(owners.isEmpty());
+
+        if (enableCache && store instanceof RelationalEntityStore) {
+          RelationalEntityStore relationalEntityStore = 
(RelationalEntityStore) store;
+          if (relationalEntityStore.getCache() instanceof CaffeineEntityCache) 
{
+            CaffeineEntityCache cache = (CaffeineEntityCache) 
relationalEntityStore.getCache();
+            List<Entity> cachedOwners =
+                cache
+                    .getCacheData()
+                    .getIfPresent(
+                        EntityCacheRelationKey.of(
+                            function.nameIdentifier(),
+                            Entity.EntityType.FUNCTION,
+                            SupportsRelationOperations.Type.OWNER_REL));
+            Assertions.assertNotNull(cachedOwners);
+            Assertions.assertTrue(cachedOwners.isEmpty());
+          }
+        }
+
+        // 2. Set an owner; cache for OWNER_REL should be invalidated
+        UserEntity ownerUser =
+            createUserEntity(
+                RandomIdGenerator.INSTANCE.nextId(),
+                AuthorizationUtils.ofUserNamespace("metalake"),
+                "ownerUser",
+                auditInfo);
+        store.put(ownerUser, false);
+
+        relationOperations.insertRelation(
+            SupportsRelationOperations.Type.OWNER_REL,
+            function.nameIdentifier(),
+            Entity.EntityType.FUNCTION,
+            ownerUser.nameIdentifier(),
+            Entity.EntityType.USER,
+            true);
+
+        // 3. Query owner again - should return the owner and be re-cached
+        owners =
+            relationOperations.listEntitiesByRelation(
+                SupportsRelationOperations.Type.OWNER_REL,
+                function.nameIdentifier(),
+                Entity.EntityType.FUNCTION,
+                true);
+        Assertions.assertEquals(1, owners.size());
+        Assertions.assertEquals(ownerUser.name(), owners.get(0).name());
+
+        // 4. Rename the function; old OWNER_REL cache entry should be evicted 
via BFS invalidation
+        FunctionEntity renamedFunction =
+            createFunctionEntity(
+                function.id(),
+                Namespace.of("metalake", "catalog", "schema"),
+                "function_renamed",
+                auditInfo);
+        store.update(
+            function.nameIdentifier(),
+            FunctionEntity.class,
+            Entity.EntityType.FUNCTION,
+            e -> renamedFunction);
+
+        if (enableCache && store instanceof RelationalEntityStore) {
+          RelationalEntityStore relationalEntityStore = 
(RelationalEntityStore) store;
+          if (relationalEntityStore.getCache() instanceof CaffeineEntityCache) 
{
+            CaffeineEntityCache cache = (CaffeineEntityCache) 
relationalEntityStore.getCache();
+            // The old function's OWNER_REL cache should have been evicted by 
BFS invalidation
+            List<Entity> cachedOwners =
+                cache
+                    .getCacheData()
+                    .getIfPresent(
+                        EntityCacheRelationKey.of(
+                            function.nameIdentifier(),
+                            Entity.EntityType.FUNCTION,
+                            SupportsRelationOperations.Type.OWNER_REL));
+            Assertions.assertNull(cachedOwners);
+          }
+        }
+
+        // 5. Query owner via new name - should still return the correct owner
+        owners =
+            relationOperations.listEntitiesByRelation(
+                SupportsRelationOperations.Type.OWNER_REL,
+                renamedFunction.nameIdentifier(),
+                Entity.EntityType.FUNCTION,
+                true);
+        Assertions.assertEquals(1, owners.size());
+        Assertions.assertEquals(ownerUser.name(), owners.get(0).name());
+
+      } finally {
+        destroy(type);
+      }
+    }
+  }
+
+  private FunctionEntity createFunctionEntity(
+      Long id, Namespace namespace, String name, AuditInfo auditInfo) {
+    FunctionParam param1 = FunctionParams.of("param1", 
Types.IntegerType.get());
+    FunctionParam param2 = FunctionParams.of("param2", Types.StringType.get());
+    FunctionImpl functionImpl =
+        FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "SELECT param1 + 
1");
+    FunctionDefinition definition =
+        FunctionDefinitions.of(
+            new FunctionParam[] {param1, param2},
+            Types.IntegerType.get(),
+            new FunctionImpl[] {functionImpl});
+
+    return FunctionEntity.builder()
+        .withId(id)
+        .withName(name)
+        .withNamespace(namespace)
+        .withComment("test function comment")
+        .withFunctionType(FunctionType.SCALAR)
+        .withDeterministic(false)
+        .withDefinitions(new FunctionDefinition[] {definition})
+        .withAuditInfo(auditInfo)
+        .build();
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index 00a9a5aed4..ff388c3b23 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -41,6 +41,13 @@ import org.apache.gravitino.authorization.Privileges;
 import org.apache.gravitino.authorization.SecurableObject;
 import org.apache.gravitino.authorization.SecurableObjects;
 import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
 import org.apache.gravitino.integration.test.util.CloseContainerExtension;
 import org.apache.gravitino.integration.test.util.PrintFuncNameExtension;
 import org.apache.gravitino.job.ShellJobTemplate;
@@ -49,6 +56,7 @@ import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.FunctionEntity;
 import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.JobTemplateEntity;
@@ -64,6 +72,7 @@ import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.policy.Policy;
 import org.apache.gravitino.policy.PolicyContent;
 import org.apache.gravitino.policy.PolicyContents;
+import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
 import org.apache.gravitino.utils.NamespaceUtil;
@@ -548,6 +557,28 @@ public abstract class TestJDBCBackend {
         .build();
   }
 
+  protected FunctionEntity createFunctionEntity(
+      Long id, Namespace namespace, String name, AuditInfo auditInfo) {
+    FunctionParam param1 = FunctionParams.of("param1", 
Types.IntegerType.get());
+    FunctionParam param2 = FunctionParams.of("param2", Types.StringType.get());
+    FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"SELECT param1 + 1");
+    FunctionDefinition definition =
+        FunctionDefinitions.of(
+            new FunctionParam[] {param1, param2},
+            Types.IntegerType.get(),
+            new FunctionImpl[] {impl});
+    return FunctionEntity.builder()
+        .withId(id)
+        .withName(name)
+        .withNamespace(namespace)
+        .withComment("test function comment")
+        .withFunctionType(FunctionType.SCALAR)
+        .withDeterministic(false)
+        .withDefinitions(new FunctionDefinition[] {definition})
+        .withAuditInfo(auditInfo)
+        .build();
+  }
+
   protected void createParentEntities(
       String metalakeName, String catalogName, String schemaName, AuditInfo 
auditInfo)
       throws IOException {
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFunctionMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFunctionMetaService.java
index 6d87b04408..5d2471c9f3 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFunctionMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestFunctionMetaService.java
@@ -24,6 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -37,18 +39,15 @@ import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.authorization.AuthorizationUtils;
+import org.apache.gravitino.authorization.Privileges;
+import org.apache.gravitino.authorization.SecurableObject;
+import org.apache.gravitino.authorization.SecurableObjects;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
-import org.apache.gravitino.function.FunctionDefinition;
-import org.apache.gravitino.function.FunctionDefinitions;
-import org.apache.gravitino.function.FunctionImpl;
-import org.apache.gravitino.function.FunctionImpls;
-import org.apache.gravitino.function.FunctionParam;
-import org.apache.gravitino.function.FunctionParams;
-import org.apache.gravitino.function.FunctionType;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
-import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.FunctionEntity;
-import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.meta.RoleEntity;
+import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.storage.relational.TestJDBCBackend;
 import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
@@ -114,6 +113,34 @@ public class TestFunctionMetaService extends 
TestJDBCBackend {
     assertEquals(function.deterministic(), loadedFunction.deterministic());
   }
 
+  @TestTemplate
+  public void testGetFunctionIdBySchemaIdAndFunctionName() throws IOException {
+    String functionName = GravitinoITUtils.genRandomName("test_function");
+    Namespace ns = NamespaceUtil.ofFunction(metalakeName, catalogName, 
schemaName);
+    FunctionEntity function =
+        createFunctionEntity(RandomIdGenerator.INSTANCE.nextId(), ns, 
functionName, AUDIT_INFO);
+    FunctionMetaService.getInstance().insertFunction(function, false);
+
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(metalakeName, catalogName, schemaName), 
Entity.EntityType.SCHEMA);
+    Long functionId =
+        FunctionMetaService.getInstance()
+            .getFunctionIdBySchemaIdAndFunctionName(schemaId, functionName);
+    assertEquals(function.id(), functionId);
+
+    assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            FunctionMetaService.getInstance()
+                .getFunctionIdBySchemaIdAndFunctionName(schemaId, functionName 
+ "_missing"));
+    assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            FunctionMetaService.getInstance()
+                .getFunctionIdBySchemaIdAndFunctionName(-1L, functionName));
+  }
+
   @TestTemplate
   public void testMultipleVersionsInStorage() throws IOException {
     // This test verifies that multiple versions are created in storage layer
@@ -222,6 +249,37 @@ public class TestFunctionMetaService extends 
TestJDBCBackend {
 
     FunctionMetaService.getInstance().insertFunction(function, false);
 
+    // Set up owner relation
+    UserEntity user =
+        createUserEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            AuthorizationUtils.ofUserNamespace(metalakeName),
+            "user1",
+            AUDIT_INFO);
+    backend.insert(user, false);
+    OwnerMetaService.getInstance()
+        .setOwner(function.nameIdentifier(), function.type(), 
user.nameIdentifier(), user.type());
+
+    // Set up role/securable object relation
+    SecurableObject schemaObject =
+        SecurableObjects.ofSchema(
+            SecurableObjects.ofCatalog(
+                catalogName, 
Lists.newArrayList(Privileges.UseCatalog.allow())),
+            schemaName,
+            Lists.newArrayList(Privileges.UseSchema.allow()));
+    SecurableObject functionObject =
+        SecurableObjects.ofFunction(
+            schemaObject, functionName, 
Lists.newArrayList(Privileges.ExecuteFunction.allow()));
+    RoleEntity role =
+        createRoleEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            AuthorizationUtils.ofRoleNamespace(metalakeName),
+            "role1",
+            AUDIT_INFO,
+            Lists.newArrayList(functionObject),
+            ImmutableMap.of());
+    RoleMetaService.getInstance().insertRole(role, false);
+
     NameIdentifier functionIdent =
         NameIdentifier.of(metalakeName, catalogName, schemaName, functionName);
     
assertTrue(FunctionMetaService.getInstance().deleteFunction(functionIdent));
@@ -230,6 +288,12 @@ public class TestFunctionMetaService extends 
TestJDBCBackend {
     assertThrows(
         NoSuchEntityException.class,
         () -> 
FunctionMetaService.getInstance().getFunctionByIdentifier(functionIdent));
+
+    // Verify owner relation is cleaned up
+    assertEquals(0, countActiveOwnerRelForMetadataObject(function.id(), 
"FUNCTION"));
+
+    // Verify securable object (role) relation is cleaned up
+    assertEquals(0, countActiveObjectRelForRole(role.id()));
   }
 
   @TestTemplate
@@ -462,27 +526,46 @@ public class TestFunctionMetaService extends 
TestJDBCBackend {
     assertTrue(loadedFunction.deterministic());
   }
 
-  private FunctionEntity createFunctionEntity(
-      Long id, Namespace namespace, String name, AuditInfo auditInfo) {
-    FunctionParam param1 = FunctionParams.of("param1", 
Types.IntegerType.get());
-    FunctionParam param2 = FunctionParams.of("param2", Types.StringType.get());
-    FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"SELECT param1 + 1");
-    FunctionDefinition definition =
-        FunctionDefinitions.of(
-            new FunctionParam[] {param1, param2},
-            Types.IntegerType.get(),
-            new FunctionImpl[] {impl});
-
-    return FunctionEntity.builder()
-        .withId(id)
-        .withName(name)
-        .withNamespace(namespace)
-        .withComment("test function comment")
-        .withFunctionType(FunctionType.SCALAR)
-        .withDeterministic(false)
-        .withDefinitions(new FunctionDefinition[] {definition})
-        .withAuditInfo(auditInfo)
-        .build();
+  private int countActiveOwnerRelForMetadataObject(
+      Long metadataObjectId, String metadataObjectType) {
+    try (SqlSession sqlSession =
+            
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+        Connection connection = sqlSession.getConnection();
+        Statement statement = connection.createStatement();
+        ResultSet rs =
+            statement.executeQuery(
+                String.format(
+                    "SELECT count(*) FROM owner_meta"
+                        + " WHERE metadata_object_id = %d AND 
metadata_object_type = '%s'"
+                        + " AND deleted_at = 0",
+                    metadataObjectId, metadataObjectType))) {
+      if (rs.next()) {
+        return rs.getInt(1);
+      }
+      throw new RuntimeException("No result for 
countActiveOwnerRelForMetadataObject");
+    } catch (SQLException e) {
+      throw new RuntimeException("SQL execution failed", e);
+    }
+  }
+
+  private int countActiveObjectRelForRole(Long roleId) {
+    try (SqlSession sqlSession =
+            
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+        Connection connection = sqlSession.getConnection();
+        Statement statement = connection.createStatement();
+        ResultSet rs =
+            statement.executeQuery(
+                String.format(
+                    "SELECT count(*) FROM role_meta_securable_object"
+                        + " WHERE role_id = %d AND deleted_at = 0",
+                    roleId))) {
+      if (rs.next()) {
+        return rs.getInt(1);
+      }
+      throw new RuntimeException("No result for countActiveObjectRelForRole");
+    } catch (SQLException e) {
+      throw new RuntimeException("SQL execution failed", e);
+    }
   }
 
   private Map<Integer, Long> listFunctionVersions(Long functionId) {
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestMetadataObjectService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestMetadataObjectService.java
index 4d14e21b5c..2728a1ee2b 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestMetadataObjectService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestMetadataObjectService.java
@@ -24,14 +24,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
 import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.FunctionEntity;
 import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.meta.JobTemplateEntity;
 import org.apache.gravitino.meta.PolicyEntity;
@@ -39,7 +52,11 @@ import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.policy.PolicyContent;
 import org.apache.gravitino.policy.PolicyContents;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.session.SqlSession;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 
@@ -282,4 +299,119 @@ public class TestMetadataObjectService extends 
TestJDBCBackend {
     // Verify all are TABLE type
     assertTrue(metadataObjects.stream().allMatch(obj -> obj.type() == 
MetadataObject.Type.TABLE));
   }
+
+  @TestTemplate
+  public void testBatchGetFunctionObjectsFullName() throws IOException {
+    String catalogName = "test_function_catalog";
+    String schemaName = "test_function_schema";
+    String functionOne = "function_one";
+    String functionTwo = "function_two";
+    Namespace namespace = Namespace.of(METALAKE_NAME, catalogName, schemaName);
+    createAndInsertCatalog(METALAKE_NAME, catalogName);
+    createAndInsertSchema(METALAKE_NAME, catalogName, schemaName);
+
+    FunctionEntity functionEntityOne = 
createAndInsertFunctionEntity(namespace, functionOne);
+    FunctionEntity functionEntityTwo = 
createAndInsertFunctionEntity(namespace, functionTwo);
+
+    List<GenericEntity> functionEntities =
+        List.of(
+            GenericEntity.builder()
+                .withId(functionEntityOne.id())
+                .withName(functionEntityOne.name())
+                .withNamespace(functionEntityOne.namespace())
+                .withEntityType(Entity.EntityType.FUNCTION)
+                .build(),
+            GenericEntity.builder()
+                .withId(functionEntityTwo.id())
+                .withName(functionEntityTwo.name())
+                .withNamespace(functionEntityTwo.namespace())
+                .withEntityType(Entity.EntityType.FUNCTION)
+                .build());
+
+    List<MetadataObject> metadataObjects =
+        MetadataObjectService.fromGenericEntities(functionEntities);
+
+    assertEquals(2, metadataObjects.size());
+    assertTrue(
+        metadataObjects.stream().allMatch(obj -> obj.type() == 
MetadataObject.Type.FUNCTION));
+
+    Set<String> functionFullNames =
+        
metadataObjects.stream().map(MetadataObject::fullName).collect(Collectors.toSet());
+    assertEquals(
+        Set.of(
+            catalogName + "." + schemaName + "." + functionOne,
+            catalogName + "." + schemaName + "." + functionTwo),
+        functionFullNames);
+  }
+
+  @TestTemplate
+  public void testBatchGetFunctionObjectsFullNameWithMissingSchema() throws 
IOException {
+    String catalogName = "test_function_catalog_missing_parent";
+    String schemaName = "test_function_schema_missing_parent";
+    String functionName = "function_with_missing_schema";
+    Namespace namespace = Namespace.of(METALAKE_NAME, catalogName, schemaName);
+    createAndInsertCatalog(METALAKE_NAME, catalogName);
+    createAndInsertSchema(METALAKE_NAME, catalogName, schemaName);
+
+    FunctionEntity functionEntity = createAndInsertFunctionEntity(namespace, 
functionName);
+    Long schemaId =
+        EntityIdService.getEntityId(
+            NameIdentifier.of(METALAKE_NAME, catalogName, schemaName), 
Entity.EntityType.SCHEMA);
+    softDeleteSchemaMeta(schemaId);
+
+    List<GenericEntity> functionEntities =
+        List.of(
+            GenericEntity.builder()
+                .withId(functionEntity.id())
+                .withName(functionEntity.name())
+                .withNamespace(functionEntity.namespace())
+                .withEntityType(Entity.EntityType.FUNCTION)
+                .build());
+
+    List<MetadataObject> metadataObjects =
+        MetadataObjectService.fromGenericEntities(functionEntities);
+
+    assertTrue(metadataObjects.isEmpty());
+  }
+
+  private FunctionEntity createAndInsertFunctionEntity(Namespace namespace, 
String functionName)
+      throws IOException {
+    FunctionEntity functionEntity =
+        FunctionEntity.builder()
+            .withId(RandomIdGenerator.INSTANCE.nextId())
+            .withName(functionName)
+            .withNamespace(namespace)
+            .withComment("function comment")
+            .withFunctionType(FunctionType.SCALAR)
+            .withDeterministic(true)
+            .withDefinitions(new FunctionDefinition[] 
{createFunctionDefinition()})
+            .withAuditInfo(AUDIT_INFO)
+            .build();
+    backend.insert(functionEntity, false);
+    return functionEntity;
+  }
+
+  private FunctionDefinition createFunctionDefinition() {
+    FunctionParam inputParam = FunctionParams.of("param1", 
Types.IntegerType.get());
+    FunctionImpl functionImpl =
+        FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "SELECT param1 + 
1");
+    return FunctionDefinitions.of(
+        new FunctionParam[] {inputParam},
+        Types.IntegerType.get(),
+        new FunctionImpl[] {functionImpl});
+  }
+
+  private void softDeleteSchemaMeta(Long schemaId) {
+    try (SqlSession sqlSession =
+            
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+        Connection connection = sqlSession.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "UPDATE schema_meta SET deleted_at = %d WHERE schema_id = %d",
+              Instant.now().toEpochMilli(), schemaId));
+    } catch (SQLException e) {
+      throw new RuntimeException("Failed to soft delete schema metadata for 
test setup", e);
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
index 541095f782..51b6c9d0d8 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestOwnerMetaService.java
@@ -37,6 +37,7 @@ import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.FunctionEntity;
 import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.meta.GroupEntity;
 import org.apache.gravitino.meta.ModelEntity;
@@ -254,6 +255,13 @@ class TestOwnerMetaService extends TestJDBCBackend {
             Namespace.of(TestOwnerMetaService.METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME),
             "view");
     backend.insert(view, false);
+    FunctionEntity function =
+        createFunctionEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(TestOwnerMetaService.METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME),
+            "function",
+            AUDIT_INFO);
+    backend.insert(function, false);
 
     UserEntity user =
         createUserEntity(
@@ -277,43 +285,50 @@ class TestOwnerMetaService extends TestJDBCBackend {
         .setOwner(model.nameIdentifier(), model.type(), user.nameIdentifier(), 
user.type());
     OwnerMetaService.getInstance()
         .setOwner(view.nameIdentifier(), view.type(), user.nameIdentifier(), 
user.type());
+    OwnerMetaService.getInstance()
+        .setOwner(function.nameIdentifier(), function.type(), 
user.nameIdentifier(), user.type());
 
-    Assertions.assertEquals(7, countAllOwnerRel(user.id()));
-    Assertions.assertEquals(7, countActiveOwnerRel(user.id()));
+    Assertions.assertEquals(8, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(8, countActiveOwnerRel(user.id()));
 
     // Test to delete view
     ViewMetaService.getInstance().deleteView(view.nameIdentifier());
-    Assertions.assertEquals(7, countAllOwnerRel(user.id()));
-    Assertions.assertEquals(6, countActiveOwnerRel(user.id()));
+    Assertions.assertEquals(8, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(7, countActiveOwnerRel(user.id()));
 
     // Test to delete model
     ModelMetaService.getInstance().deleteModel(model.nameIdentifier());
-    Assertions.assertEquals(7, countAllOwnerRel(user.id()));
-    Assertions.assertEquals(5, countActiveOwnerRel(user.id()));
+    Assertions.assertEquals(8, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(6, countActiveOwnerRel(user.id()));
 
     // Test to delete table
     TableMetaService.getInstance().deleteTable(table.nameIdentifier());
-    Assertions.assertEquals(7, countAllOwnerRel(user.id()));
-    Assertions.assertEquals(4, countActiveOwnerRel(user.id()));
+    Assertions.assertEquals(8, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(5, countActiveOwnerRel(user.id()));
 
     // Test to delete topic
     TopicMetaService.getInstance().deleteTopic(topic.nameIdentifier());
-    Assertions.assertEquals(7, countAllOwnerRel(user.id()));
-    Assertions.assertEquals(3, countActiveOwnerRel(user.id()));
+    Assertions.assertEquals(8, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(4, countActiveOwnerRel(user.id()));
 
     // Test to delete fileset
     FilesetMetaService.getInstance().deleteFileset(fileset.nameIdentifier());
-    Assertions.assertEquals(7, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(8, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(3, countActiveOwnerRel(user.id()));
+
+    // Test to delete function
+    
FunctionMetaService.getInstance().deleteFunction(function.nameIdentifier());
+    Assertions.assertEquals(8, countAllOwnerRel(user.id()));
     Assertions.assertEquals(2, countActiveOwnerRel(user.id()));
 
     // Test to delete schema
     SchemaMetaService.getInstance().deleteSchema(schema.nameIdentifier(), 
false);
-    Assertions.assertEquals(7, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(8, countAllOwnerRel(user.id()));
     Assertions.assertEquals(1, countActiveOwnerRel(user.id()));
 
     // Test to delete catalog
     CatalogMetaService.getInstance().deleteCatalog(catalog.nameIdentifier(), 
false);
-    Assertions.assertEquals(7, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(8, countAllOwnerRel(user.id()));
     Assertions.assertEquals(0, countActiveOwnerRel(user.id()));
 
     // Test to delete catalog with cascade mode
@@ -370,6 +385,13 @@ class TestOwnerMetaService extends TestJDBCBackend {
             Namespace.of(TestOwnerMetaService.METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME),
             "view");
     backend.insert(view, false);
+    FunctionEntity function2 =
+        createFunctionEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(TestOwnerMetaService.METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME),
+            "function",
+            AUDIT_INFO);
+    backend.insert(function2, false);
 
     OwnerMetaService.getInstance()
         .setOwner(catalog.nameIdentifier(), catalog.type(), 
user.nameIdentifier(), user.type());
@@ -385,9 +407,11 @@ class TestOwnerMetaService extends TestJDBCBackend {
         .setOwner(model.nameIdentifier(), model.type(), user.nameIdentifier(), 
user.type());
     OwnerMetaService.getInstance()
         .setOwner(view.nameIdentifier(), view.type(), user.nameIdentifier(), 
user.type());
+    OwnerMetaService.getInstance()
+        .setOwner(function2.nameIdentifier(), function2.type(), 
user.nameIdentifier(), user.type());
 
     CatalogMetaService.getInstance().deleteCatalog(catalog.nameIdentifier(), 
true);
-    Assertions.assertEquals(14, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(16, countAllOwnerRel(user.id()));
     Assertions.assertEquals(0, countActiveOwnerRel(user.id()));
 
     // Test to delete schema with cascade mode
@@ -448,6 +472,13 @@ class TestOwnerMetaService extends TestJDBCBackend {
             Namespace.of(TestOwnerMetaService.METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME),
             "view");
     backend.insert(view, false);
+    FunctionEntity function3 =
+        createFunctionEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(TestOwnerMetaService.METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME),
+            "function",
+            AUDIT_INFO);
+    backend.insert(function3, false);
 
     OwnerMetaService.getInstance()
         .setOwner(schema.nameIdentifier(), schema.type(), 
user.nameIdentifier(), user.type());
@@ -463,9 +494,11 @@ class TestOwnerMetaService extends TestJDBCBackend {
         .setOwner(model.nameIdentifier(), model.type(), user.nameIdentifier(), 
user.type());
     OwnerMetaService.getInstance()
         .setOwner(view.nameIdentifier(), view.type(), user.nameIdentifier(), 
user.type());
+    OwnerMetaService.getInstance()
+        .setOwner(function3.nameIdentifier(), function3.type(), 
user.nameIdentifier(), user.type());
 
     SchemaMetaService.getInstance().deleteSchema(schema.nameIdentifier(), 
true);
-    Assertions.assertEquals(21, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(24, countAllOwnerRel(user.id()));
     Assertions.assertEquals(1, countActiveOwnerRel(user.id()));
 
     // Test to delete user
@@ -516,6 +549,13 @@ class TestOwnerMetaService extends TestJDBCBackend {
             Namespace.of(TestOwnerMetaService.METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME),
             "view");
     backend.insert(view, false);
+    FunctionEntity function4 =
+        createFunctionEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(TestOwnerMetaService.METALAKE_NAME, CATALOG_NAME, 
SCHEMA_NAME),
+            "function",
+            AUDIT_INFO);
+    backend.insert(function4, false);
 
     OwnerMetaService.getInstance()
         .setOwner(schema.nameIdentifier(), schema.type(), 
user.nameIdentifier(), user.type());
@@ -531,9 +571,11 @@ class TestOwnerMetaService extends TestJDBCBackend {
         .setOwner(model.nameIdentifier(), model.type(), user.nameIdentifier(), 
user.type());
     OwnerMetaService.getInstance()
         .setOwner(view.nameIdentifier(), view.type(), user.nameIdentifier(), 
user.type());
+    OwnerMetaService.getInstance()
+        .setOwner(function4.nameIdentifier(), function4.type(), 
user.nameIdentifier(), user.type());
 
     UserMetaService.getInstance().deleteUser(user.nameIdentifier());
-    Assertions.assertEquals(28, countAllOwnerRel(user.id()));
+    Assertions.assertEquals(32, countAllOwnerRel(user.id()));
     Assertions.assertEquals(0, countActiveOwnerRel(user.id()));
   }
 
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
index 0867346b74..b93d533b24 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSecurableObjects.java
@@ -37,6 +37,7 @@ import org.apache.gravitino.authorization.SecurableObjects;
 import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.FunctionEntity;
 import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.RoleEntity;
@@ -226,6 +227,13 @@ public class TestSecurableObjects extends TestJDBCBackend {
             Namespace.of("metalake", "catalog", "schema"),
             "view");
     backend.insert(view, false);
+    FunctionEntity function =
+        createFunctionEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "function",
+            AUDIT_INFO);
+    backend.insert(function, false);
 
     SecurableObject catalogObject =
         SecurableObjects.ofCatalog(
@@ -250,6 +258,9 @@ public class TestSecurableObjects extends TestJDBCBackend {
     SecurableObject viewObject =
         SecurableObjects.ofView(
             schemaObject, "view", 
Lists.newArrayList(Privileges.SelectTable.allow()));
+    SecurableObject functionObject =
+        SecurableObjects.ofFunction(
+            schemaObject, "function", 
Lists.newArrayList(Privileges.ExecuteFunction.allow()));
 
     RoleEntity role1 =
         createRoleEntity(
@@ -264,48 +275,54 @@ public class TestSecurableObjects extends TestJDBCBackend 
{
                 filesetObject,
                 topicObject,
                 modelObject,
-                viewObject),
+                viewObject,
+                functionObject),
             ImmutableMap.of("k1", "v1"));
 
     roleMetaService.insertRole(role1, false);
 
-    Assertions.assertEquals(7, countAllObjectRel(role1.id()));
-    Assertions.assertEquals(7, countActiveObjectRel(role1.id()));
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(8, countActiveObjectRel(role1.id()));
 
     // Test to delete view
     ViewMetaService.getInstance()
         .deleteView(NameIdentifier.of("metalake", "catalog", "schema", 
"view"));
-    Assertions.assertEquals(7, countAllObjectRel(role1.id()));
-    Assertions.assertEquals(6, countActiveObjectRel(role1.id()));
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(7, countActiveObjectRel(role1.id()));
 
     // Test to delete model
     ModelMetaService.getInstance().deleteModel(model.nameIdentifier());
-    Assertions.assertEquals(7, countAllObjectRel(role1.id()));
-    Assertions.assertEquals(5, countActiveObjectRel(role1.id()));
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(6, countActiveObjectRel(role1.id()));
 
     // Test to delete table
     TableMetaService.getInstance().deleteTable(table.nameIdentifier());
-    Assertions.assertEquals(7, countAllObjectRel(role1.id()));
-    Assertions.assertEquals(4, countActiveObjectRel(role1.id()));
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(5, countActiveObjectRel(role1.id()));
 
     // Test to delete topic
     TopicMetaService.getInstance().deleteTopic(topic.nameIdentifier());
-    Assertions.assertEquals(7, countAllObjectRel(role1.id()));
-    Assertions.assertEquals(3, countActiveObjectRel(role1.id()));
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(4, countActiveObjectRel(role1.id()));
 
     // Test to delete fileset
     FilesetMetaService.getInstance().deleteFileset(fileset.nameIdentifier());
-    Assertions.assertEquals(7, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(3, countActiveObjectRel(role1.id()));
+
+    // Test to delete function
+    
FunctionMetaService.getInstance().deleteFunction(function.nameIdentifier());
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
     Assertions.assertEquals(2, countActiveObjectRel(role1.id()));
 
     // Test to delete schema
     SchemaMetaService.getInstance().deleteSchema(schema.nameIdentifier(), 
false);
-    Assertions.assertEquals(7, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
     Assertions.assertEquals(1, countActiveObjectRel(role1.id()));
 
     // Test to delete catalog
     CatalogMetaService.getInstance().deleteCatalog(catalog.nameIdentifier(), 
false);
-    Assertions.assertEquals(7, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
     Assertions.assertEquals(0, countActiveObjectRel(role1.id()));
 
     roleMetaService.deleteRole(role1.nameIdentifier());
@@ -364,6 +381,13 @@ public class TestSecurableObjects extends TestJDBCBackend {
             Namespace.of("metalake", "catalog", "schema"),
             "view");
     backend.insert(view, false);
+    FunctionEntity function2 =
+        createFunctionEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "function",
+            AUDIT_INFO);
+    backend.insert(function2, false);
 
     role1 =
         createRoleEntity(
@@ -378,13 +402,14 @@ public class TestSecurableObjects extends TestJDBCBackend 
{
                 filesetObject,
                 topicObject,
                 modelObject,
-                viewObject),
+                viewObject,
+                functionObject),
             ImmutableMap.of("k1", "v1"));
 
     roleMetaService.insertRole(role1, false);
 
     CatalogMetaService.getInstance().deleteCatalog(catalog.nameIdentifier(), 
true);
-    Assertions.assertEquals(7, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
     Assertions.assertEquals(0, countActiveObjectRel(role1.id()));
 
     roleMetaService.deleteRole(role1.nameIdentifier());
@@ -441,6 +466,13 @@ public class TestSecurableObjects extends TestJDBCBackend {
             Namespace.of("metalake", "catalog", "schema"),
             "view");
     backend.insert(view, false);
+    FunctionEntity function3 =
+        createFunctionEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "function",
+            AUDIT_INFO);
+    backend.insert(function3, false);
 
     role1 =
         createRoleEntity(
@@ -455,13 +487,14 @@ public class TestSecurableObjects extends TestJDBCBackend 
{
                 filesetObject,
                 topicObject,
                 modelObject,
-                viewObject),
+                viewObject,
+                functionObject),
             ImmutableMap.of("k1", "v1"));
 
     roleMetaService.insertRole(role1, false);
 
     SchemaMetaService.getInstance().deleteSchema(schema.nameIdentifier(), 
true);
-    Assertions.assertEquals(7, countAllObjectRel(role1.id()));
+    Assertions.assertEquals(8, countAllObjectRel(role1.id()));
     Assertions.assertEquals(1, countActiveObjectRel(role1.id()));
   }
 

Reply via email to