This is an automated email from the ASF dual-hosted git repository.

diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 9809eed5c9 [#9533] feat(trino-connector): Add UDF adaptation for Trino 
connector (#10494)
9809eed5c9 is described below

commit 9809eed5c923e6e4df2bf0c310f122bc4c6b1f87
Author: akshay thorat <[email protected]>
AuthorDate: Wed Apr 1 03:13:19 2026 -0700

    [#9533] feat(trino-connector): Add UDF adaptation for Trino connector 
(#10494)
    
    ### What changes were proposed in this pull request?
    Add support for consuming Gravitino function metadata in the Trino
    connector by mapping SQL functions with RuntimeType.TRINO to Trino's
    built-in LanguageFunction API.
    
    ### Why are the changes needed?
    - CatalogConnectorMetadata: Add FunctionCatalog support with graceful
    fallback when catalog does not support functions. Add methods to list
    and retrieve functions from Gravitino server.
    - GravitinoMetadata: Implement listLanguageFunctions() and
    getLanguageFunctions() to convert Gravitino SQL functions with TRINO
    runtime to Trino LanguageFunction instances.
    - TestGravitinoMetadataFunction: 8 unit tests covering listing,
    filtering, multiple definitions, signature tokens, and edge cases.
    - TestCatalogConnectorMetadataFunction: 7 unit tests covering function
    catalog support detection, listing, retrieval, and error handling.
    
    Fix #9533
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    
    Added test cases.
---
 docs/trino-connector/index.md                      |   1 +
 docs/trino-connector/udf-support.md                |  64 ++++
 .../connector/integration/test/TrinoUDFIT.java     | 329 +++++++++++++++++++++
 .../trino/connector/GravitinoMetadata.java         |  94 ++++++
 .../catalog/CatalogConnectorMetadata.java          |  58 ++++
 .../connector/TestGravitinoMetadataFunction.java   | 265 +++++++++++++++++
 .../TestCatalogConnectorMetadataFunction.java      | 150 ++++++++++
 7 files changed, 961 insertions(+)

diff --git a/docs/trino-connector/index.md b/docs/trino-connector/index.md
index 95e239a080..5afb35a41b 100644
--- a/docs/trino-connector/index.md
+++ b/docs/trino-connector/index.md
@@ -18,4 +18,5 @@ Apache Gravitino Trino connector index:
     - [PostgreSQL](catalog-postgresql.md)
   - [Trino cascading query](trino-cascading-query.md)
   - [Supported SQL](sql-support.md)
+  - [UDF support](udf-support.md)
   - [Development](development.md)
diff --git a/docs/trino-connector/udf-support.md 
b/docs/trino-connector/udf-support.md
new file mode 100644
index 0000000000..10dfd5aa02
--- /dev/null
+++ b/docs/trino-connector/udf-support.md
@@ -0,0 +1,64 @@
+---
+title: "Apache Gravitino Trino connector UDF support"
+slug: /trino-connector/udf-support
+keyword: gravitino connector trino udf function
+license: "This software is licensed under the Apache License version 2."
+---
+
+The Gravitino Trino connector supports user-defined functions (UDFs) 
registered in Apache Gravitino.
+Functions with `RuntimeType.TRINO` and SQL language implementations are 
automatically exposed as
+[Trino language 
functions](https://trino.io/docs/current/routines/function.html), making them 
available for use in Trino queries.
+
+## How it works
+
+When Gravitino catalogs contain registered functions, the Trino connector:
+
+1. Lists functions from the Gravitino server for each schema.
+2. Filters to include only functions with `RuntimeType.TRINO` and 
`Language.SQL`.
+3. Maps each function implementation to a Trino `LanguageFunction` with a 
signature token derived from the function name and parameter types.
+
+Functions registered with other runtimes (e.g., `SPARK`) are **not** visible 
in Trino.
+
+## Prerequisites
+
+- The Gravitino catalog must support function operations (i.e., implement 
`FunctionCatalog`).
+- Functions must be registered in Gravitino via the Gravitino client or REST 
API before they can be queried from Trino.
+
+## Registering a UDF
+
+Use the Gravitino Java client to register a function:
+
+```java
+FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+functionCatalog.registerFunction(
+    NameIdentifier.of("my_schema", "add_one"),
+    "Adds one to input",
+    FunctionType.SCALAR,
+    true,
+    FunctionDefinitions.of(
+        FunctionDefinitions.of(
+            FunctionParams.of(FunctionParams.of("x", Types.IntegerType.get())),
+            Types.IntegerType.get(),
+            FunctionImpls.of(
+                FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, "RETURN x 
+ 1")))));
+```
+
+## Querying UDFs from Trino
+
+Once registered, the function appears in Trino:
+
+```sql
+-- List available functions in a schema
+SHOW FUNCTIONS FROM catalog.my_schema;
+
+-- Invoke the function
+SELECT catalog.my_schema.add_one(5);
+-- Returns: 6
+```
+
+## Limitations
+
+- **Read-only**: The Trino connector currently supports listing and invoking 
Gravitino UDFs. Creating or dropping functions via Trino SQL (`CREATE FUNCTION` 
/ `DROP FUNCTION`) is not yet supported.
+- **SQL only**: Only SQL-language implementations are mapped. Java and Python 
implementations are not exposed to Trino.
+- **TRINO runtime only**: Only functions with `RuntimeType.TRINO` are visible. 
Functions registered with `RuntimeType.SPARK` or other runtimes are filtered 
out.
+- **Type mapping**: Function parameter and return types are converted from 
Gravitino types to Trino types. Unsupported types will cause the function to be 
skipped with a warning log.
diff --git 
a/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoUDFIT.java
 
b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoUDFIT.java
new file mode 100644
index 0000000000..a5d57fc364
--- /dev/null
+++ 
b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoUDFIT.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.integration.test;
+
+import static java.lang.Thread.sleep;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for Trino connector UDF adaptation. Verifies that 
functions registered in
+ * Gravitino with TRINO runtime are visible via Trino's language function API.
+ */
+@Tag("gravitino-docker-test")
+public class TrinoUDFIT extends TrinoQueryITBase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TrinoUDFIT.class);
+
+  private static final String CATALOG_NAME = "gt_hive_udf";
+  private static final String SCHEMA_NAME = "gt_udf_schema";
+  private static Catalog catalog;
+
+  @BeforeAll
+  public static void setUp() throws Exception {
+    TrinoUDFIT instance = new TrinoUDFIT();
+    instance.setup();
+
+    createHiveCatalog();
+    createSchema();
+  }
+
+  @AfterAll
+  public static void tearDown() {
+    try {
+      cleanupFunctionsAndSchema();
+      dropCatalog(CATALOG_NAME);
+    } catch (Exception e) {
+      LOG.error("Error during teardown", e);
+    }
+    TrinoQueryITBase.cleanup();
+  }
+
+  private static void createHiveCatalog() throws Exception {
+    Map<String, String> properties = new HashMap<>();
+    properties.put("metastore.uris", hiveMetastoreUri);
+
+    boolean exists = metalake.catalogExists(CATALOG_NAME);
+    if (!exists) {
+      metalake.createCatalog(
+          CATALOG_NAME, Catalog.Type.RELATIONAL, "hive", "UDF test catalog", 
properties);
+    }
+
+    // Wait for catalog to sync to Trino
+    boolean catalogReady = false;
+    int tries = 180;
+    while (!catalogReady && tries-- >= 0) {
+      try {
+        String result = trinoQueryRunner.runQuery("show catalogs");
+        if (result.contains(metalakeName + "." + CATALOG_NAME)) {
+          catalogReady = true;
+          break;
+        }
+      } catch (Exception e) {
+        LOG.info("Waiting for catalog to sync to Trino");
+      }
+      sleep(1000);
+    }
+
+    if (!catalogReady) {
+      throw new Exception("Catalog " + CATALOG_NAME + " sync timeout");
+    }
+
+    catalog = metalake.loadCatalog(CATALOG_NAME);
+  }
+
+  private static void createSchema() {
+    boolean exists = catalog.asSchemas().schemaExists(SCHEMA_NAME);
+    if (!exists) {
+      catalog.asSchemas().createSchema(SCHEMA_NAME, "UDF test schema", 
Collections.emptyMap());
+    }
+  }
+
+  private static void cleanupFunctionsAndSchema() {
+    try {
+      FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+      NameIdentifier[] functions = 
functionCatalog.listFunctions(Namespace.of(SCHEMA_NAME));
+      for (NameIdentifier fn : functions) {
+        functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, 
fn.name()));
+      }
+    } catch (Exception e) {
+      LOG.error("Error cleaning up functions", e);
+    }
+
+    try {
+      catalog.asSchemas().dropSchema(SCHEMA_NAME, false);
+    } catch (Exception e) {
+      LOG.error("Error dropping schema", e);
+    }
+  }
+
+  @Test
+  public void testListLanguageFunctionsShowsRegisteredUDF() throws Exception {
+    String functionName = "test_add_one";
+    FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+    // Register a scalar function: test_add_one(x INTEGER) -> INTEGER
+    // Uses TRINO runtime + SQL language so it maps to a Trino LanguageFunction
+    // SQL body "RETURN x + 1" adds 1 to the input integer
+    Function function =
+        functionCatalog.registerFunction(
+            NameIdentifier.of(SCHEMA_NAME, functionName),
+            "Adds one to input",
+            FunctionType.SCALAR,
+            true,
+            FunctionDefinitions.of(
+                FunctionDefinitions.of(
+                    FunctionParams.of(FunctionParams.of("x", 
Types.IntegerType.get())),
+                    Types.IntegerType.get(),
+                    FunctionImpls.of(
+                        FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, 
"RETURN x + 1")))));
+    Assertions.assertNotNull(function);
+
+    // Query Trino to verify the function is listed
+    String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+    String showFunctionsQuery =
+        String.format("SHOW FUNCTIONS FROM %s.%s", trinoCatalogName, 
SCHEMA_NAME);
+    String result = trinoQueryRunner.runQuery(showFunctionsQuery);
+
+    LOG.info("SHOW FUNCTIONS result: {}", result);
+    Assertions.assertTrue(
+        result.contains(functionName),
+        "Expected function " + functionName + " to be listed. Got: " + result);
+
+    // Cleanup
+    functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, functionName));
+  }
+
+  @Test
+  public void testSelectUDFReturnsCorrectResult() throws Exception {
+    String functionName = "test_add_five";
+    FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+    // Register a scalar function: test_add_five(x INTEGER) -> INTEGER
+    // SQL body "RETURN x + 5" adds 5 to the input integer
+    Function function =
+        functionCatalog.registerFunction(
+            NameIdentifier.of(SCHEMA_NAME, functionName),
+            "Adds five to input",
+            FunctionType.SCALAR,
+            true,
+            FunctionDefinitions.of(
+                FunctionDefinitions.of(
+                    FunctionParams.of(FunctionParams.of("x", 
Types.IntegerType.get())),
+                    Types.IntegerType.get(),
+                    FunctionImpls.of(
+                        FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, 
"RETURN x + 5")))));
+    Assertions.assertNotNull(function);
+
+    // Invoke the function via SELECT and verify the result
+    String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+    String selectQuery =
+        String.format("SELECT %s.%s.%s(5)", trinoCatalogName, SCHEMA_NAME, 
functionName);
+    String result = trinoQueryRunner.runQuery(selectQuery);
+
+    LOG.info("SELECT result: {}", result);
+    // Parse the query result and verify the exact numeric output
+    String trimmedResult = result.trim();
+    Assertions.assertTrue(
+        trimmedResult.contains("10"),
+        "Expected SELECT test_add_five(5) to return 10. Got: " + 
trimmedResult);
+    Assertions.assertFalse(
+        trimmedResult.contains("100"),
+        "Result should be exactly 10, not a number containing 10. Got: " + 
trimmedResult);
+
+    // Cleanup
+    functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, functionName));
+  }
+
+  @Test
+  public void testListLanguageFunctionsFiltersNonTrinoRuntime() throws 
Exception {
+    String functionName = "spark_only_func";
+    FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+    // Register a scalar function: spark_only_func(x INTEGER) -> INTEGER
+    // Uses SPARK runtime, so this should NOT be visible in Trino
+    Function function =
+        functionCatalog.registerFunction(
+            NameIdentifier.of(SCHEMA_NAME, functionName),
+            "Spark-only function",
+            FunctionType.SCALAR,
+            true,
+            FunctionDefinitions.of(
+                FunctionDefinitions.of(
+                    FunctionParams.of(FunctionParams.of("x", 
Types.IntegerType.get())),
+                    Types.IntegerType.get(),
+                    FunctionImpls.of(
+                        FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, 
"RETURN x + 1")))));
+    Assertions.assertNotNull(function);
+
+    // Query Trino - SPARK runtime function should be filtered out
+    String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+    String showFunctionsQuery =
+        String.format("SHOW FUNCTIONS FROM %s.%s", trinoCatalogName, 
SCHEMA_NAME);
+    String result = trinoQueryRunner.runQuery(showFunctionsQuery);
+
+    LOG.info("SHOW FUNCTIONS result (should not contain spark_only_func): {}", 
result);
+    Assertions.assertFalse(
+        result.contains(functionName),
+        "SPARK runtime function should not appear in Trino. Got: " + result);
+
+    // Cleanup
+    functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, functionName));
+  }
+
+  @Test
+  public void testMultipleUDFsInSameSchema() throws Exception {
+    String func1Name = "udf_multiply";
+    String func2Name = "udf_concat";
+    FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+    // Register two TRINO SQL functions:
+    // udf_multiply(x INTEGER) -> INTEGER: multiplies input by 2
+    // udf_concat(a STRING, b STRING) -> STRING: concatenates two strings
+    functionCatalog.registerFunction(
+        NameIdentifier.of(SCHEMA_NAME, func1Name),
+        "Multiply by 2",
+        FunctionType.SCALAR,
+        true,
+        FunctionDefinitions.of(
+            FunctionDefinitions.of(
+                FunctionParams.of(FunctionParams.of("x", 
Types.IntegerType.get())),
+                Types.IntegerType.get(),
+                FunctionImpls.of(
+                    FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, 
"RETURN x * 2")))));
+
+    functionCatalog.registerFunction(
+        NameIdentifier.of(SCHEMA_NAME, func2Name),
+        "Concat strings",
+        FunctionType.SCALAR,
+        true,
+        FunctionDefinitions.of(
+            FunctionDefinitions.of(
+                FunctionParams.of(
+                    FunctionParams.of("a", Types.StringType.get()),
+                    FunctionParams.of("b", Types.StringType.get())),
+                Types.StringType.get(),
+                FunctionImpls.of(
+                    FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, 
"RETURN concat(a, b)")))));
+
+    // Query Trino to verify both functions are listed
+    String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+    String showFunctionsQuery =
+        String.format("SHOW FUNCTIONS FROM %s.%s", trinoCatalogName, 
SCHEMA_NAME);
+    String result = trinoQueryRunner.runQuery(showFunctionsQuery);
+
+    LOG.info("SHOW FUNCTIONS result: {}", result);
+    Assertions.assertTrue(
+        result.contains(func1Name), "Expected " + func1Name + " to be listed. 
Got: " + result);
+    Assertions.assertTrue(
+        result.contains(func2Name), "Expected " + func2Name + " to be listed. 
Got: " + result);
+
+    // Cleanup
+    functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, func1Name));
+    functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, func2Name));
+  }
+
+  @Test
+  public void testNoFunctionsWhenSchemaIsEmpty() {
+    // Create a separate empty schema
+    String emptySchema = "gt_empty_udf_schema";
+    boolean exists = catalog.asSchemas().schemaExists(emptySchema);
+    if (!exists) {
+      catalog.asSchemas().createSchema(emptySchema, "empty schema", 
Collections.emptyMap());
+    }
+
+    String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+    String showFunctionsQuery =
+        String.format("SHOW FUNCTIONS FROM %s.%s", trinoCatalogName, 
emptySchema);
+    String result = trinoQueryRunner.runQuery(showFunctionsQuery);
+
+    LOG.info("SHOW FUNCTIONS for empty schema: {}", result);
+    // Verify no Gravitino-registered functions appear; check for specific 
function names
+    // that would only exist if registered via Gravitino (not built-in Trino 
functions)
+    Assertions.assertFalse(
+        result.contains("test_add_one"),
+        "Expected no Gravitino-registered UDFs in empty schema. Got: " + 
result);
+    Assertions.assertFalse(
+        result.contains("test_add_five"),
+        "Expected no Gravitino-registered UDFs in empty schema. Got: " + 
result);
+
+    // Cleanup
+    catalog.asSchemas().dropSchema(emptySchema, false);
+  }
+}
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
index 7cf72da146..ba3ca624dc 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
@@ -53,21 +53,35 @@ import io.trino.spi.connector.SystemTable;
 import io.trino.spi.connector.TopNApplicationResult;
 import io.trino.spi.expression.ConnectorExpression;
 import io.trino.spi.expression.Constant;
+import io.trino.spi.function.LanguageFunction;
+import io.trino.spi.function.SchemaFunctionName;
 import io.trino.spi.security.TrinoPrincipal;
 import io.trino.spi.statistics.ColumnStatistics;
 import io.trino.spi.statistics.TableStatistics;
 import io.trino.spi.type.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.SQLImpl;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
 import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
 import org.apache.gravitino.trino.connector.metadata.GravitinoSchema;
 import org.apache.gravitino.trino.connector.metadata.GravitinoTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The GravitinoMetadata class provides operations for Apache Gravitino 
metadata on the Gravitino
@@ -76,6 +90,8 @@ import 
org.apache.gravitino.trino.connector.metadata.GravitinoTable;
  */
 public abstract class GravitinoMetadata implements ConnectorMetadata {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoMetadata.class);
+
   // The column handle name that will generate row IDs for the merge operation.
   public static final String MERGE_ROW_ID = "$row_id";
 
@@ -679,4 +695,82 @@ public abstract class GravitinoMetadata implements 
ConnectorMetadata {
     }
     return internalMetadataColumnMetadata.getName();
   }
+
+  @Override
+  public Collection<LanguageFunction> listLanguageFunctions(
+      ConnectorSession session, String schemaName) {
+    if (!catalogConnectorMetadata.supportsFunctions()) {
+      return List.of();
+    }
+    return 
Arrays.stream(catalogConnectorMetadata.listFunctionInfos(schemaName))
+        .flatMap(function -> toLanguageFunctions(function).stream())
+        .toList();
+  }
+
+  @Override
+  public Collection<LanguageFunction> getLanguageFunctions(
+      ConnectorSession session, SchemaFunctionName name) {
+    if (!catalogConnectorMetadata.supportsFunctions()) {
+      return List.of();
+    }
+    try {
+      Function function =
+          catalogConnectorMetadata.getFunction(name.getSchemaName(), 
name.getFunctionName());
+      if (function == null) {
+        return List.of();
+      }
+      return toLanguageFunctions(function);
+    } catch (NoSuchFunctionException e) {
+      LOG.debug("Function {} not found in schema {}", name.getFunctionName(), 
name.getSchemaName());
+      return List.of();
+    }
+  }
+
+  /**
+   * Converts a Gravitino function to a collection of Trino LanguageFunction 
instances. Only SQL
+   * implementations with TRINO runtime are included. Each definition with a 
Trino SQL
+   * implementation produces one LanguageFunction. The signature token is 
generated from the
+   * function name and parameter types.
+   */
+  private Collection<LanguageFunction> toLanguageFunctions(Function function) {
+    List<LanguageFunction> result = new ArrayList<>();
+    for (FunctionDefinition definition : function.definitions()) {
+      for (FunctionImpl impl : definition.impls()) {
+        if (!isTrinoSqlImplementation(impl)) {
+          continue;
+        }
+        String sql = ((SQLImpl) impl).sql();
+        try {
+          String signatureToken = buildSignatureToken(function.name(), 
definition.parameters());
+          result.add(new LanguageFunction(signatureToken, sql, List.of(), 
Optional.empty()));
+        } catch (TrinoException e) {
+          LOG.warn("Failed to build signature token for function {}", 
function.name(), e);
+        }
+      }
+    }
+    return result;
+  }
+
+  private boolean isTrinoSqlImplementation(FunctionImpl impl) {
+    return FunctionImpl.RuntimeType.TRINO.equals(impl.runtime())
+        && FunctionImpl.Language.SQL.equals(impl.language());
+  }
+
+  /**
+   * Builds a signature token from function name and parameters. The token 
uses Trino type names
+   * (e.g., varchar instead of string) and is lowercase as required by Trino's 
LanguageFunction.
+   */
+  private String buildSignatureToken(String functionName, FunctionParam[] 
params) {
+    StringBuilder sb = new 
StringBuilder(functionName.toLowerCase(Locale.ENGLISH));
+    sb.append("(");
+    for (int i = 0; i < params.length; i++) {
+      if (i > 0) {
+        sb.append(",");
+      }
+      Type trinoType = 
metadataAdapter.getDataTypeTransformer().getTrinoType(params[i].dataType());
+      sb.append(trinoType.getDisplayName().toLowerCase(Locale.ENGLISH));
+    }
+    sb.append(")");
+    return sb.toString();
+  }
 }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
index 3e2727ac2b..7fd83e7575 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
@@ -24,6 +24,7 @@ import io.trino.spi.connector.SchemaTableName;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
@@ -36,6 +37,8 @@ import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
 import org.apache.gravitino.rel.TableChange;
@@ -44,16 +47,21 @@ import 
org.apache.gravitino.trino.connector.GravitinoErrorCode;
 import org.apache.gravitino.trino.connector.metadata.GravitinoColumn;
 import org.apache.gravitino.trino.connector.metadata.GravitinoSchema;
 import org.apache.gravitino.trino.connector.metadata.GravitinoTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** This class implements Apache Gravitino metadata operators. */
 public class CatalogConnectorMetadata {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogConnectorMetadata.class);
+
   private static final String CATALOG_DOES_NOT_EXIST_MSG = "Catalog does not 
exist";
   private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema does not 
exist";
 
   private final String catalogName;
   private final SupportsSchemas schemaCatalog;
   private final TableCatalog tableCatalog;
+  @Nullable private final FunctionCatalog functionCatalog;
 
   /**
    * Constructs a new CatalogConnectorMetadata.
@@ -68,6 +76,13 @@ public class CatalogConnectorMetadata {
       // Make sure the catalog support schema operations.
       this.schemaCatalog = catalog.asSchemas();
       this.tableCatalog = catalog.asTableCatalog();
+      FunctionCatalog fc = null;
+      try {
+        fc = catalog.asFunctionCatalog();
+      } catch (UnsupportedOperationException e) {
+        LOG.debug("Catalog {} does not support function operations", 
catalogName);
+      }
+      this.functionCatalog = fc;
     } catch (NoSuchCatalogException e) {
       throw new TrinoException(
           GravitinoErrorCode.GRAVITINO_CATALOG_NOT_EXISTS, 
CATALOG_DOES_NOT_EXIST_MSG, e);
@@ -396,4 +411,47 @@ public class CatalogConnectorMetadata {
     String[] columnNames = {columnName};
     applyAlter(schemaTableName, TableChange.updateColumnType(columnNames, 
type));
   }
+
+  /**
+   * Checks whether the catalog supports function operations.
+   *
+   * @return true if the catalog supports function operations, false otherwise
+   */
+  public boolean supportsFunctions() {
+    return functionCatalog != null;
+  }
+
+  /**
+   * Lists all functions with details in the specified schema.
+   *
+   * @param schemaName the name of the schema
+   * @return an array of functions
+   * @throws UnsupportedOperationException if the catalog does not support 
functions
+   */
+  public Function[] listFunctionInfos(String schemaName) {
+    if (!supportsFunctions()) {
+      throw new UnsupportedOperationException("Catalog does not support 
functions");
+    }
+    try {
+      return functionCatalog.listFunctionInfos(Namespace.of(schemaName));
+    } catch (NoSuchSchemaException e) {
+      throw new TrinoException(
+          GravitinoErrorCode.GRAVITINO_SCHEMA_NOT_EXISTS, 
SCHEMA_DOES_NOT_EXIST_MSG, e);
+    }
+  }
+
+  /**
+   * Retrieves a function by its schema and function name.
+   *
+   * @param schemaName the name of the schema
+   * @param functionName the name of the function
+   * @return the function
+   * @throws UnsupportedOperationException if the catalog does not support 
functions
+   */
+  public Function getFunction(String schemaName, String functionName) {
+    if (!supportsFunctions()) {
+      throw new UnsupportedOperationException("Catalog does not support 
functions");
+    }
+    return functionCatalog.getFunction(NameIdentifier.of(schemaName, 
functionName));
+  }
 }
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataFunction.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataFunction.java
new file mode 100644
index 0000000000..bc7cfa0568
--- /dev/null
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataFunction.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.function.LanguageFunction;
+import io.trino.spi.function.SchemaFunctionName;
+import java.util.Collection;
+import java.util.List;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
+import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
+import org.apache.gravitino.trino.connector.util.GeneralDataTypeTransformer;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoMetadataFunction {
+
+  @Test
+  public void testListLanguageFunctionsReturnsTrinoSqlFunctions() {
+    Function function =
+        createMockFunction("my_func", "RETURN x + 1", 
FunctionImpl.RuntimeType.TRINO);
+    CatalogConnectorMetadata catalogMetadata = 
mock(CatalogConnectorMetadata.class);
+    when(catalogMetadata.supportsFunctions()).thenReturn(true);
+    when(catalogMetadata.listFunctionInfos("test_schema")).thenReturn(new 
Function[] {function});
+
+    GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+    ConnectorSession session = mock(ConnectorSession.class);
+
+    Collection<LanguageFunction> functions = 
metadata.listLanguageFunctions(session, "test_schema");
+    assertEquals(1, functions.size());
+
+    LanguageFunction langFunc = functions.iterator().next();
+    assertEquals("RETURN x + 1", langFunc.sql());
+    assertEquals("my_func(integer)", langFunc.signatureToken());
+  }
+
+  @Test
+  public void testGetLanguageFunctionsReturnsTrinoSqlFunctions() {
+    Function function =
+        createMockFunction("my_func", "RETURN x + 1", 
FunctionImpl.RuntimeType.TRINO);
+    CatalogConnectorMetadata catalogMetadata = 
mock(CatalogConnectorMetadata.class);
+    when(catalogMetadata.supportsFunctions()).thenReturn(true);
+    when(catalogMetadata.getFunction("test_schema", 
"my_func")).thenReturn(function);
+
+    GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+    ConnectorSession session = mock(ConnectorSession.class);
+
+    Collection<LanguageFunction> functions =
+        metadata.getLanguageFunctions(session, new 
SchemaFunctionName("test_schema", "my_func"));
+    assertEquals(1, functions.size());
+
+    LanguageFunction langFunc = functions.iterator().next();
+    assertEquals("RETURN x + 1", langFunc.sql());
+  }
+
+  @Test
+  public void testListLanguageFunctionsFiltersNonTrinoRuntime() {
+    Function sparkFunction =
+        createMockFunction("spark_func", "RETURN 1", 
FunctionImpl.RuntimeType.SPARK);
+    Function trinoFunction =
+        createMockFunction("trino_func", "RETURN 2", 
FunctionImpl.RuntimeType.TRINO);
+
+    CatalogConnectorMetadata catalogMetadata = 
mock(CatalogConnectorMetadata.class);
+    when(catalogMetadata.supportsFunctions()).thenReturn(true);
+    when(catalogMetadata.listFunctionInfos("test_schema"))
+        .thenReturn(new Function[] {sparkFunction, trinoFunction});
+
+    GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+    ConnectorSession session = mock(ConnectorSession.class);
+
+    Collection<LanguageFunction> functions = 
metadata.listLanguageFunctions(session, "test_schema");
+    assertEquals(1, functions.size());
+    assertEquals("RETURN 2", functions.iterator().next().sql());
+  }
+
+  @Test
+  public void testListLanguageFunctionsWhenUnsupported() {
+    CatalogConnectorMetadata catalogMetadata = 
mock(CatalogConnectorMetadata.class);
+    when(catalogMetadata.supportsFunctions()).thenReturn(false);
+
+    GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+    ConnectorSession session = mock(ConnectorSession.class);
+
+    Collection<LanguageFunction> functions = 
metadata.listLanguageFunctions(session, "test_schema");
+    assertTrue(functions.isEmpty());
+  }
+
+  @Test
+  public void testGetLanguageFunctionsWhenFunctionNotFound() {
+    CatalogConnectorMetadata catalogMetadata = 
mock(CatalogConnectorMetadata.class);
+    when(catalogMetadata.supportsFunctions()).thenReturn(true);
+    when(catalogMetadata.getFunction("test_schema", "no_such_func"))
+        .thenThrow(new NoSuchFunctionException("Function not found"));
+
+    GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+    ConnectorSession session = mock(ConnectorSession.class);
+
+    Collection<LanguageFunction> functions =
+        metadata.getLanguageFunctions(
+            session, new SchemaFunctionName("test_schema", "no_such_func"));
+    assertTrue(functions.isEmpty());
+  }
+
+  @Test
+  public void testGetLanguageFunctionsWhenUnsupported() {
+    CatalogConnectorMetadata catalogMetadata = 
mock(CatalogConnectorMetadata.class);
+    when(catalogMetadata.supportsFunctions()).thenReturn(false);
+
+    GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+    ConnectorSession session = mock(ConnectorSession.class);
+
+    Collection<LanguageFunction> functions =
+        metadata.getLanguageFunctions(session, new 
SchemaFunctionName("test_schema", "my_func"));
+    assertTrue(functions.isEmpty());
+  }
+
+  @Test
+  public void testMultipleDefinitionsAndImpls() {
+    FunctionParam param1 = createMockParam("x", Types.IntegerType.get());
+    FunctionParam param2 = createMockParam("x", Types.StringType.get());
+
+    FunctionImpl trinoSqlImpl1 =
+        FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, "RETURN x + 1");
+    FunctionImpl trinoSqlImpl2 =
+        FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, "RETURN 
length(x)");
+    FunctionImpl sparkImpl = 
FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "RETURN x * 2");
+
+    FunctionDefinition def1 = createMockDefinition(new FunctionParam[] 
{param1}, trinoSqlImpl1);
+    FunctionDefinition def2 =
+        createMockDefinition(new FunctionParam[] {param2}, trinoSqlImpl2, 
sparkImpl);
+
+    Function function = createMockFunctionWithDefinitions("multi_func", def1, 
def2);
+
+    CatalogConnectorMetadata catalogMetadata = 
mock(CatalogConnectorMetadata.class);
+    when(catalogMetadata.supportsFunctions()).thenReturn(true);
+    when(catalogMetadata.listFunctionInfos("test_schema")).thenReturn(new 
Function[] {function});
+
+    GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+    ConnectorSession session = mock(ConnectorSession.class);
+
+    Collection<LanguageFunction> functions = 
metadata.listLanguageFunctions(session, "test_schema");
+    // Should have 2 Trino SQL impls (one from def1, one from def2; sparkImpl 
filtered out)
+    assertEquals(2, functions.size());
+
+    List<String> sqlBodies = 
functions.stream().map(LanguageFunction::sql).sorted().toList();
+    assertEquals("RETURN length(x)", sqlBodies.get(0));
+    assertEquals("RETURN x + 1", sqlBodies.get(1));
+  }
+
+  @Test
+  public void testSignatureTokenFormat() {
+    FunctionParam param1 = createMockParam("a", Types.IntegerType.get());
+    FunctionParam param2 = createMockParam("b", Types.StringType.get());
+
+    FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, 
"RETURN a");
+    FunctionDefinition def = createMockDefinition(new FunctionParam[] {param1, 
param2}, impl);
+    Function function = createMockFunctionWithDefinitions("test_func", def);
+
+    CatalogConnectorMetadata catalogMetadata = 
mock(CatalogConnectorMetadata.class);
+    when(catalogMetadata.supportsFunctions()).thenReturn(true);
+    when(catalogMetadata.getFunction("s", "test_func")).thenReturn(function);
+
+    GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+    ConnectorSession session = mock(ConnectorSession.class);
+
+    Collection<LanguageFunction> functions =
+        metadata.getLanguageFunctions(session, new SchemaFunctionName("s", 
"test_func"));
+    assertEquals(1, functions.size());
+    assertEquals("test_func(integer,varchar)", 
functions.iterator().next().signatureToken());
+  }
+
+  @Test
+  public void testNoArgsFunction() {
+    FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, 
"RETURN 42");
+    FunctionDefinition def = createMockDefinition(new FunctionParam[] {}, 
impl);
+    Function function = createMockFunctionWithDefinitions("const_func", def);
+
+    CatalogConnectorMetadata catalogMetadata = 
mock(CatalogConnectorMetadata.class);
+    when(catalogMetadata.supportsFunctions()).thenReturn(true);
+    when(catalogMetadata.getFunction("s", "const_func")).thenReturn(function);
+
+    GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+    ConnectorSession session = mock(ConnectorSession.class);
+
+    Collection<LanguageFunction> functions =
+        metadata.getLanguageFunctions(session, new SchemaFunctionName("s", 
"const_func"));
+    assertEquals(1, functions.size());
+    LanguageFunction lf = functions.iterator().next();
+    assertEquals("const_func()", lf.signatureToken());
+    assertEquals("RETURN 42", lf.sql());
+  }
+
+  private GravitinoMetadata createTestMetadata(CatalogConnectorMetadata 
catalogMetadata) {
+    CatalogConnectorMetadataAdapter metadataAdapter = 
mock(CatalogConnectorMetadataAdapter.class);
+    when(metadataAdapter.getDataTypeTransformer()).thenReturn(new 
GeneralDataTypeTransformer());
+    ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class);
+    // Use a concrete anonymous subclass since GravitinoMetadata is abstract
+    return new GravitinoMetadata(catalogMetadata, metadataAdapter, 
internalMetadata) {
+      // No need to implement abstract methods for these tests since they are 
version-specific
+    };
+  }
+
+  private Function createMockFunction(String name, String sql, 
FunctionImpl.RuntimeType runtime) {
+    FunctionParam param = createMockParam("x", Types.IntegerType.get());
+    FunctionImpl impl = FunctionImpls.ofSql(runtime, sql);
+    FunctionDefinition definition = createMockDefinition(new FunctionParam[] 
{param}, impl);
+    return createMockFunctionWithDefinitions(name, definition);
+  }
+
+  private Function createMockFunctionWithDefinitions(
+      String name, FunctionDefinition... definitions) {
+    Function function = mock(Function.class);
+    when(function.name()).thenReturn(name);
+    when(function.functionType()).thenReturn(FunctionType.SCALAR);
+    when(function.deterministic()).thenReturn(true);
+    when(function.definitions()).thenReturn(definitions);
+    Audit audit = mock(Audit.class);
+    when(function.auditInfo()).thenReturn(audit);
+    return function;
+  }
+
+  private FunctionDefinition createMockDefinition(FunctionParam[] params, 
FunctionImpl... impls) {
+    FunctionDefinition definition = mock(FunctionDefinition.class);
+    when(definition.parameters()).thenReturn(params);
+    when(definition.impls()).thenReturn(impls);
+    return definition;
+  }
+
+  private FunctionParam createMockParam(String name, 
org.apache.gravitino.rel.types.Type type) {
+    FunctionParam param = mock(FunctionParam.class);
+    when(param.name()).thenReturn(name);
+    when(param.dataType()).thenReturn(type);
+    return param;
+  }
+}
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorMetadataFunction.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorMetadataFunction.java
new file mode 100644
index 0000000000..2a9a20a075
--- /dev/null
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorMetadataFunction.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.TableCatalog;
+import org.junit.jupiter.api.Test;
+
+public class TestCatalogConnectorMetadataFunction {
+
+  @Test
+  public void testSupportsFunctionsWhenCatalogSupports() {
+    CatalogConnectorMetadata metadata = 
createMetadataWithFunctionCatalog(true);
+    assertTrue(metadata.supportsFunctions());
+  }
+
+  @Test
+  public void testSupportsFunctionsWhenCatalogDoesNotSupport() {
+    CatalogConnectorMetadata metadata = 
createMetadataWithFunctionCatalog(false);
+    assertFalse(metadata.supportsFunctions());
+  }
+
+  @Test
+  public void testListFunctionInfosReturnsFunctions() {
+    Function mockFunction = createMockFunction("test_func");
+    FunctionCatalog functionCatalog = mock(FunctionCatalog.class);
+    when(functionCatalog.listFunctionInfos(any(Namespace.class)))
+        .thenReturn(new Function[] {mockFunction});
+
+    CatalogConnectorMetadata metadata = 
createMetadataWithMockFunctionCatalog(functionCatalog);
+    Function[] functions = metadata.listFunctionInfos("test_schema");
+    assertEquals(1, functions.length);
+    assertEquals("test_func", functions[0].name());
+  }
+
+  @Test
+  public void testListFunctionInfosThrowsWhenUnsupported() {
+    CatalogConnectorMetadata metadata = 
createMetadataWithFunctionCatalog(false);
+    assertThrows(
+        UnsupportedOperationException.class, () -> 
metadata.listFunctionInfos("test_schema"));
+  }
+
+  @Test
+  public void testGetFunctionReturnsFunction() {
+    Function mockFunction = createMockFunction("my_func");
+    FunctionCatalog functionCatalog = mock(FunctionCatalog.class);
+    
when(functionCatalog.getFunction(any(NameIdentifier.class))).thenReturn(mockFunction);
+
+    CatalogConnectorMetadata metadata = 
createMetadataWithMockFunctionCatalog(functionCatalog);
+    Function function = metadata.getFunction("test_schema", "my_func");
+    assertNotNull(function);
+    assertEquals("my_func", function.name());
+  }
+
+  @Test
+  public void testGetFunctionThrowsWhenNotFound() {
+    FunctionCatalog functionCatalog = mock(FunctionCatalog.class);
+    when(functionCatalog.getFunction(any(NameIdentifier.class)))
+        .thenThrow(new NoSuchFunctionException("Function does not exist"));
+
+    CatalogConnectorMetadata metadata = 
createMetadataWithMockFunctionCatalog(functionCatalog);
+    assertThrows(
+        NoSuchFunctionException.class, () -> 
metadata.getFunction("test_schema", "no_such_func"));
+  }
+
+  @Test
+  public void testGetFunctionThrowsWhenUnsupported() {
+    CatalogConnectorMetadata metadata = 
createMetadataWithFunctionCatalog(false);
+    assertThrows(
+        UnsupportedOperationException.class, () -> 
metadata.getFunction("test_schema", "my_func"));
+  }
+
+  private CatalogConnectorMetadata createMetadataWithFunctionCatalog(boolean 
supportsFunctions) {
+    GravitinoMetalake metalake = mock(GravitinoMetalake.class);
+    Catalog catalog = mock(Catalog.class);
+    when(catalog.name()).thenReturn("test_catalog");
+    when(metalake.loadCatalog(anyString())).thenReturn(catalog);
+    when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class));
+    when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class));
+    if (supportsFunctions) {
+      
when(catalog.asFunctionCatalog()).thenReturn(mock(FunctionCatalog.class));
+    } else {
+      when(catalog.asFunctionCatalog())
+          .thenThrow(new UnsupportedOperationException("Not supported"));
+    }
+    return new CatalogConnectorMetadata(metalake, 
NameIdentifier.of("metalake", "test_catalog"));
+  }
+
+  private CatalogConnectorMetadata createMetadataWithMockFunctionCatalog(
+      FunctionCatalog functionCatalog) {
+    GravitinoMetalake metalake = mock(GravitinoMetalake.class);
+    Catalog catalog = mock(Catalog.class);
+    when(catalog.name()).thenReturn("test_catalog");
+    when(metalake.loadCatalog(anyString())).thenReturn(catalog);
+    when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class));
+    when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class));
+    when(catalog.asFunctionCatalog()).thenReturn(functionCatalog);
+    return new CatalogConnectorMetadata(metalake, 
NameIdentifier.of("metalake", "test_catalog"));
+  }
+
+  private Function createMockFunction(String name) {
+    Function function = mock(Function.class);
+    when(function.name()).thenReturn(name);
+    when(function.functionType()).thenReturn(FunctionType.SCALAR);
+    when(function.deterministic()).thenReturn(true);
+    when(function.definitions()).thenReturn(new FunctionDefinition[0]);
+    Audit audit = mock(Audit.class);
+    when(audit.creator()).thenReturn("test");
+    when(audit.createTime()).thenReturn(Instant.now());
+    when(function.auditInfo()).thenReturn(audit);
+    return function;
+  }
+}

Reply via email to