This is an automated email from the ASF dual-hosted git repository.
diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 9809eed5c9 [#9533] feat(trino-connector): Add UDF adaptation for Trino
connector (#10494)
9809eed5c9 is described below
commit 9809eed5c923e6e4df2bf0c310f122bc4c6b1f87
Author: akshay thorat <[email protected]>
AuthorDate: Wed Apr 1 03:13:19 2026 -0700
[#9533] feat(trino-connector): Add UDF adaptation for Trino connector
(#10494)
### What changes were proposed in this pull request?
Add support for consuming Gravitino function metadata in the Trino
connector by mapping SQL functions with RuntimeType.TRINO to Trino's
built-in LanguageFunction API.
### Why are the changes needed?
- CatalogConnectorMetadata: Add FunctionCatalog support with graceful
fallback when catalog does not support functions. Add methods to list
and retrieve functions from Gravitino server.
- GravitinoMetadata: Implement listLanguageFunctions() and
getLanguageFunctions() to convert Gravitino SQL functions with TRINO
runtime to Trino LanguageFunction instances.
- TestGravitinoMetadataFunction: 8 unit tests covering listing,
filtering, multiple definitions, signature tokens, and edge cases.
- TestCatalogConnectorMetadataFunction: 7 unit tests covering function
catalog support detection, listing, retrieval, and error handling.
Fix #9533
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added test cases.
---
docs/trino-connector/index.md | 1 +
docs/trino-connector/udf-support.md | 64 ++++
.../connector/integration/test/TrinoUDFIT.java | 329 +++++++++++++++++++++
.../trino/connector/GravitinoMetadata.java | 94 ++++++
.../catalog/CatalogConnectorMetadata.java | 58 ++++
.../connector/TestGravitinoMetadataFunction.java | 265 +++++++++++++++++
.../TestCatalogConnectorMetadataFunction.java | 150 ++++++++++
7 files changed, 961 insertions(+)
diff --git a/docs/trino-connector/index.md b/docs/trino-connector/index.md
index 95e239a080..5afb35a41b 100644
--- a/docs/trino-connector/index.md
+++ b/docs/trino-connector/index.md
@@ -18,4 +18,5 @@ Apache Gravitino Trino connector index:
- [PostgreSQL](catalog-postgresql.md)
- [Trino cascading query](trino-cascading-query.md)
- [Supported SQL](sql-support.md)
+ - [UDF support](udf-support.md)
- [Development](development.md)
diff --git a/docs/trino-connector/udf-support.md
b/docs/trino-connector/udf-support.md
new file mode 100644
index 0000000000..10dfd5aa02
--- /dev/null
+++ b/docs/trino-connector/udf-support.md
@@ -0,0 +1,64 @@
+---
+title: "Apache Gravitino Trino connector UDF support"
+slug: /trino-connector/udf-support
+keyword: gravitino connector trino udf function
+license: "This software is licensed under the Apache License version 2."
+---
+
+The Gravitino Trino connector supports user-defined functions (UDFs)
registered in Apache Gravitino.
+Functions with `RuntimeType.TRINO` and SQL language implementations are
automatically exposed as
+[Trino language
functions](https://trino.io/docs/current/routines/function.html), making them
available for use in Trino queries.
+
+## How it works
+
+When Gravitino catalogs contain registered functions, the Trino connector:
+
+1. Lists functions from the Gravitino server for each schema.
+2. Filters to include only functions with `RuntimeType.TRINO` and
`Language.SQL`.
+3. Maps each function implementation to a Trino `LanguageFunction` with a
signature token derived from the function name and parameter types.
+
+Functions registered with other runtimes (e.g., `SPARK`) are **not** visible
in Trino.
+
+## Prerequisites
+
+- The Gravitino catalog must support function operations (i.e., implement
`FunctionCatalog`).
+- Functions must be registered in Gravitino via the Gravitino client or REST
API before they can be queried from Trino.
+
+## Registering a UDF
+
+Use the Gravitino Java client to register a function:
+
+```java
+FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+functionCatalog.registerFunction(
+ NameIdentifier.of("my_schema", "add_one"),
+ "Adds one to input",
+ FunctionType.SCALAR,
+ true,
+ FunctionDefinitions.of(
+ FunctionDefinitions.of(
+ FunctionParams.of(FunctionParams.of("x", Types.IntegerType.get())),
+ Types.IntegerType.get(),
+ FunctionImpls.of(
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, "RETURN x
+ 1")))));
+```
+
+## Querying UDFs from Trino
+
+Once registered, the function appears in Trino:
+
+```sql
+-- List available functions in a schema
+SHOW FUNCTIONS FROM catalog.my_schema;
+
+-- Invoke the function
+SELECT catalog.my_schema.add_one(5);
+-- Returns: 6
+```
+
+## Limitations
+
+- **Read-only**: The Trino connector currently supports listing and invoking
Gravitino UDFs. Creating or dropping functions via Trino SQL (`CREATE FUNCTION`
/ `DROP FUNCTION`) is not yet supported.
+- **SQL only**: Only SQL-language implementations are mapped. Java and Python
implementations are not exposed to Trino.
+- **TRINO runtime only**: Only functions with `RuntimeType.TRINO` are visible.
Functions registered with `RuntimeType.SPARK` or other runtimes are filtered
out.
+- **Type mapping**: Function parameter and return types are converted from
Gravitino types to Trino types. Unsupported types will cause the function to be
skipped with a warning log.
diff --git
a/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoUDFIT.java
b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoUDFIT.java
new file mode 100644
index 0000000000..a5d57fc364
--- /dev/null
+++
b/trino-connector/integration-test/src/test/java/org/apache/gravitino/trino/connector/integration/test/TrinoUDFIT.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.integration.test;
+
+import static java.lang.Thread.sleep;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionDefinitions;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParams;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for Trino connector UDF adaptation. Verifies that
functions registered in
+ * Gravitino with TRINO runtime are visible via Trino's language function API.
+ */
+@Tag("gravitino-docker-test")
+public class TrinoUDFIT extends TrinoQueryITBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TrinoUDFIT.class);
+
+ private static final String CATALOG_NAME = "gt_hive_udf";
+ private static final String SCHEMA_NAME = "gt_udf_schema";
+ private static Catalog catalog;
+
+ @BeforeAll
+ public static void setUp() throws Exception {
+ TrinoUDFIT instance = new TrinoUDFIT();
+ instance.setup();
+
+ createHiveCatalog();
+ createSchema();
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ try {
+ cleanupFunctionsAndSchema();
+ dropCatalog(CATALOG_NAME);
+ } catch (Exception e) {
+ LOG.error("Error during teardown", e);
+ }
+ TrinoQueryITBase.cleanup();
+ }
+
+ private static void createHiveCatalog() throws Exception {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("metastore.uris", hiveMetastoreUri);
+
+ boolean exists = metalake.catalogExists(CATALOG_NAME);
+ if (!exists) {
+ metalake.createCatalog(
+ CATALOG_NAME, Catalog.Type.RELATIONAL, "hive", "UDF test catalog",
properties);
+ }
+
+ // Wait for catalog to sync to Trino
+ boolean catalogReady = false;
+ int tries = 180;
+ while (!catalogReady && tries-- >= 0) {
+ try {
+ String result = trinoQueryRunner.runQuery("show catalogs");
+ if (result.contains(metalakeName + "." + CATALOG_NAME)) {
+ catalogReady = true;
+ break;
+ }
+ } catch (Exception e) {
+ LOG.info("Waiting for catalog to sync to Trino");
+ }
+ sleep(1000);
+ }
+
+ if (!catalogReady) {
+ throw new Exception("Catalog " + CATALOG_NAME + " sync timeout");
+ }
+
+ catalog = metalake.loadCatalog(CATALOG_NAME);
+ }
+
+ private static void createSchema() {
+ boolean exists = catalog.asSchemas().schemaExists(SCHEMA_NAME);
+ if (!exists) {
+ catalog.asSchemas().createSchema(SCHEMA_NAME, "UDF test schema",
Collections.emptyMap());
+ }
+ }
+
+ private static void cleanupFunctionsAndSchema() {
+ try {
+ FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+ NameIdentifier[] functions =
functionCatalog.listFunctions(Namespace.of(SCHEMA_NAME));
+ for (NameIdentifier fn : functions) {
+ functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME,
fn.name()));
+ }
+ } catch (Exception e) {
+ LOG.error("Error cleaning up functions", e);
+ }
+
+ try {
+ catalog.asSchemas().dropSchema(SCHEMA_NAME, false);
+ } catch (Exception e) {
+ LOG.error("Error dropping schema", e);
+ }
+ }
+
+ @Test
+ public void testListLanguageFunctionsShowsRegisteredUDF() throws Exception {
+ String functionName = "test_add_one";
+ FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+ // Register a scalar function: test_add_one(x INTEGER) -> INTEGER
+ // Uses TRINO runtime + SQL language so it maps to a Trino LanguageFunction
+ // SQL body "RETURN x + 1" adds 1 to the input integer
+ Function function =
+ functionCatalog.registerFunction(
+ NameIdentifier.of(SCHEMA_NAME, functionName),
+ "Adds one to input",
+ FunctionType.SCALAR,
+ true,
+ FunctionDefinitions.of(
+ FunctionDefinitions.of(
+ FunctionParams.of(FunctionParams.of("x",
Types.IntegerType.get())),
+ Types.IntegerType.get(),
+ FunctionImpls.of(
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO,
"RETURN x + 1")))));
+ Assertions.assertNotNull(function);
+
+ // Query Trino to verify the function is listed
+ String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+ String showFunctionsQuery =
+ String.format("SHOW FUNCTIONS FROM %s.%s", trinoCatalogName,
SCHEMA_NAME);
+ String result = trinoQueryRunner.runQuery(showFunctionsQuery);
+
+ LOG.info("SHOW FUNCTIONS result: {}", result);
+ Assertions.assertTrue(
+ result.contains(functionName),
+ "Expected function " + functionName + " to be listed. Got: " + result);
+
+ // Cleanup
+ functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, functionName));
+ }
+
+ @Test
+ public void testSelectUDFReturnsCorrectResult() throws Exception {
+ String functionName = "test_add_five";
+ FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+ // Register a scalar function: test_add_five(x INTEGER) -> INTEGER
+ // SQL body "RETURN x + 5" adds 5 to the input integer
+ Function function =
+ functionCatalog.registerFunction(
+ NameIdentifier.of(SCHEMA_NAME, functionName),
+ "Adds five to input",
+ FunctionType.SCALAR,
+ true,
+ FunctionDefinitions.of(
+ FunctionDefinitions.of(
+ FunctionParams.of(FunctionParams.of("x",
Types.IntegerType.get())),
+ Types.IntegerType.get(),
+ FunctionImpls.of(
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO,
"RETURN x + 5")))));
+ Assertions.assertNotNull(function);
+
+ // Invoke the function via SELECT and verify the result
+ String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+ String selectQuery =
+ String.format("SELECT %s.%s.%s(5)", trinoCatalogName, SCHEMA_NAME,
functionName);
+ String result = trinoQueryRunner.runQuery(selectQuery);
+
+ LOG.info("SELECT result: {}", result);
+ // Parse the query result and verify the exact numeric output
+ String trimmedResult = result.trim();
+ Assertions.assertTrue(
+ trimmedResult.contains("10"),
+ "Expected SELECT test_add_five(5) to return 10. Got: " +
trimmedResult);
+ Assertions.assertFalse(
+ trimmedResult.contains("100"),
+ "Result should be exactly 10, not a number containing 10. Got: " +
trimmedResult);
+
+ // Cleanup
+ functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, functionName));
+ }
+
+ @Test
+ public void testListLanguageFunctionsFiltersNonTrinoRuntime() throws
Exception {
+ String functionName = "spark_only_func";
+ FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+ // Register a scalar function: spark_only_func(x INTEGER) -> INTEGER
+ // Uses SPARK runtime, so this should NOT be visible in Trino
+ Function function =
+ functionCatalog.registerFunction(
+ NameIdentifier.of(SCHEMA_NAME, functionName),
+ "Spark-only function",
+ FunctionType.SCALAR,
+ true,
+ FunctionDefinitions.of(
+ FunctionDefinitions.of(
+ FunctionParams.of(FunctionParams.of("x",
Types.IntegerType.get())),
+ Types.IntegerType.get(),
+ FunctionImpls.of(
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK,
"RETURN x + 1")))));
+ Assertions.assertNotNull(function);
+
+ // Query Trino - SPARK runtime function should be filtered out
+ String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+ String showFunctionsQuery =
+ String.format("SHOW FUNCTIONS FROM %s.%s", trinoCatalogName,
SCHEMA_NAME);
+ String result = trinoQueryRunner.runQuery(showFunctionsQuery);
+
+ LOG.info("SHOW FUNCTIONS result (should not contain spark_only_func): {}",
result);
+ Assertions.assertFalse(
+ result.contains(functionName),
+ "SPARK runtime function should not appear in Trino. Got: " + result);
+
+ // Cleanup
+ functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, functionName));
+ }
+
+ @Test
+ public void testMultipleUDFsInSameSchema() throws Exception {
+ String func1Name = "udf_multiply";
+ String func2Name = "udf_concat";
+ FunctionCatalog functionCatalog = catalog.asFunctionCatalog();
+
+ // Register two TRINO SQL functions:
+ // udf_multiply(x INTEGER) -> INTEGER: multiplies input by 2
+ // udf_concat(a STRING, b STRING) -> STRING: concatenates two strings
+ functionCatalog.registerFunction(
+ NameIdentifier.of(SCHEMA_NAME, func1Name),
+ "Multiply by 2",
+ FunctionType.SCALAR,
+ true,
+ FunctionDefinitions.of(
+ FunctionDefinitions.of(
+ FunctionParams.of(FunctionParams.of("x",
Types.IntegerType.get())),
+ Types.IntegerType.get(),
+ FunctionImpls.of(
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO,
"RETURN x * 2")))));
+
+ functionCatalog.registerFunction(
+ NameIdentifier.of(SCHEMA_NAME, func2Name),
+ "Concat strings",
+ FunctionType.SCALAR,
+ true,
+ FunctionDefinitions.of(
+ FunctionDefinitions.of(
+ FunctionParams.of(
+ FunctionParams.of("a", Types.StringType.get()),
+ FunctionParams.of("b", Types.StringType.get())),
+ Types.StringType.get(),
+ FunctionImpls.of(
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO,
"RETURN concat(a, b)")))));
+
+ // Query Trino to verify both functions are listed
+ String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+ String showFunctionsQuery =
+ String.format("SHOW FUNCTIONS FROM %s.%s", trinoCatalogName,
SCHEMA_NAME);
+ String result = trinoQueryRunner.runQuery(showFunctionsQuery);
+
+ LOG.info("SHOW FUNCTIONS result: {}", result);
+ Assertions.assertTrue(
+ result.contains(func1Name), "Expected " + func1Name + " to be listed.
Got: " + result);
+ Assertions.assertTrue(
+ result.contains(func2Name), "Expected " + func2Name + " to be listed.
Got: " + result);
+
+ // Cleanup
+ functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, func1Name));
+ functionCatalog.dropFunction(NameIdentifier.of(SCHEMA_NAME, func2Name));
+ }
+
+ @Test
+ public void testNoFunctionsWhenSchemaIsEmpty() {
+ // Create a separate empty schema
+ String emptySchema = "gt_empty_udf_schema";
+ boolean exists = catalog.asSchemas().schemaExists(emptySchema);
+ if (!exists) {
+ catalog.asSchemas().createSchema(emptySchema, "empty schema",
Collections.emptyMap());
+ }
+
+ String trinoCatalogName = metalakeName + "." + CATALOG_NAME;
+ String showFunctionsQuery =
+ String.format("SHOW FUNCTIONS FROM %s.%s", trinoCatalogName,
emptySchema);
+ String result = trinoQueryRunner.runQuery(showFunctionsQuery);
+
+ LOG.info("SHOW FUNCTIONS for empty schema: {}", result);
+ // Verify no Gravitino-registered functions appear; check for specific
function names
+ // that would only exist if registered via Gravitino (not built-in Trino
functions)
+ Assertions.assertFalse(
+ result.contains("test_add_one"),
+ "Expected no Gravitino-registered UDFs in empty schema. Got: " +
result);
+ Assertions.assertFalse(
+ result.contains("test_add_five"),
+ "Expected no Gravitino-registered UDFs in empty schema. Got: " +
result);
+
+ // Cleanup
+ catalog.asSchemas().dropSchema(emptySchema, false);
+ }
+}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
index 7cf72da146..ba3ca624dc 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
@@ -53,21 +53,35 @@ import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TopNApplicationResult;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
+import io.trino.spi.function.LanguageFunction;
+import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ColumnStatistics;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.type.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.SQLImpl;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
import
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
import org.apache.gravitino.trino.connector.metadata.GravitinoSchema;
import org.apache.gravitino.trino.connector.metadata.GravitinoTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The GravitinoMetadata class provides operations for Apache Gravitino
metadata on the Gravitino
@@ -76,6 +90,8 @@ import
org.apache.gravitino.trino.connector.metadata.GravitinoTable;
*/
public abstract class GravitinoMetadata implements ConnectorMetadata {
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoMetadata.class);
+
// The column handle name that will generate row IDs for the merge operation.
public static final String MERGE_ROW_ID = "$row_id";
@@ -679,4 +695,82 @@ public abstract class GravitinoMetadata implements
ConnectorMetadata {
}
return internalMetadataColumnMetadata.getName();
}
+
+ @Override
+ public Collection<LanguageFunction> listLanguageFunctions(
+ ConnectorSession session, String schemaName) {
+ if (!catalogConnectorMetadata.supportsFunctions()) {
+ return List.of();
+ }
+ return
Arrays.stream(catalogConnectorMetadata.listFunctionInfos(schemaName))
+ .flatMap(function -> toLanguageFunctions(function).stream())
+ .toList();
+ }
+
+ @Override
+ public Collection<LanguageFunction> getLanguageFunctions(
+ ConnectorSession session, SchemaFunctionName name) {
+ if (!catalogConnectorMetadata.supportsFunctions()) {
+ return List.of();
+ }
+ try {
+ Function function =
+ catalogConnectorMetadata.getFunction(name.getSchemaName(),
name.getFunctionName());
+ if (function == null) {
+ return List.of();
+ }
+ return toLanguageFunctions(function);
+ } catch (NoSuchFunctionException e) {
+ LOG.debug("Function {} not found in schema {}", name.getFunctionName(),
name.getSchemaName());
+ return List.of();
+ }
+ }
+
+ /**
+ * Converts a Gravitino function to a collection of Trino LanguageFunction
instances. Only SQL
+ * implementations with TRINO runtime are included. Each definition with a
Trino SQL
+ * implementation produces one LanguageFunction. The signature token is
generated from the
+ * function name and parameter types.
+ */
+ private Collection<LanguageFunction> toLanguageFunctions(Function function) {
+ List<LanguageFunction> result = new ArrayList<>();
+ for (FunctionDefinition definition : function.definitions()) {
+ for (FunctionImpl impl : definition.impls()) {
+ if (!isTrinoSqlImplementation(impl)) {
+ continue;
+ }
+ String sql = ((SQLImpl) impl).sql();
+ try {
+ String signatureToken = buildSignatureToken(function.name(),
definition.parameters());
+ result.add(new LanguageFunction(signatureToken, sql, List.of(),
Optional.empty()));
+ } catch (TrinoException e) {
+ LOG.warn("Failed to build signature token for function {}",
function.name(), e);
+ }
+ }
+ }
+ return result;
+ }
+
+ private boolean isTrinoSqlImplementation(FunctionImpl impl) {
+ return FunctionImpl.RuntimeType.TRINO.equals(impl.runtime())
+ && FunctionImpl.Language.SQL.equals(impl.language());
+ }
+
+ /**
+ * Builds a signature token from function name and parameters. The token
uses Trino type names
+ * (e.g., varchar instead of string) and is lowercase as required by Trino's
LanguageFunction.
+ */
+ private String buildSignatureToken(String functionName, FunctionParam[]
params) {
+ StringBuilder sb = new
StringBuilder(functionName.toLowerCase(Locale.ENGLISH));
+ sb.append("(");
+ for (int i = 0; i < params.length; i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ Type trinoType =
metadataAdapter.getDataTypeTransformer().getTrinoType(params[i].dataType());
+ sb.append(trinoType.getDisplayName().toLowerCase(Locale.ENGLISH));
+ }
+ sb.append(")");
+ return sb.toString();
+ }
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
index 3e2727ac2b..7fd83e7575 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
@@ -24,6 +24,7 @@ import io.trino.spi.connector.SchemaTableName;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
@@ -36,6 +37,8 @@ import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.rel.TableChange;
@@ -44,16 +47,21 @@ import
org.apache.gravitino.trino.connector.GravitinoErrorCode;
import org.apache.gravitino.trino.connector.metadata.GravitinoColumn;
import org.apache.gravitino.trino.connector.metadata.GravitinoSchema;
import org.apache.gravitino.trino.connector.metadata.GravitinoTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** This class implements Apache Gravitino metadata operators. */
public class CatalogConnectorMetadata {
+ private static final Logger LOG =
LoggerFactory.getLogger(CatalogConnectorMetadata.class);
+
private static final String CATALOG_DOES_NOT_EXIST_MSG = "Catalog does not
exist";
private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema does not
exist";
private final String catalogName;
private final SupportsSchemas schemaCatalog;
private final TableCatalog tableCatalog;
+ @Nullable private final FunctionCatalog functionCatalog;
/**
* Constructs a new CatalogConnectorMetadata.
@@ -68,6 +76,13 @@ public class CatalogConnectorMetadata {
// Make sure the catalog support schema operations.
this.schemaCatalog = catalog.asSchemas();
this.tableCatalog = catalog.asTableCatalog();
+ FunctionCatalog fc = null;
+ try {
+ fc = catalog.asFunctionCatalog();
+ } catch (UnsupportedOperationException e) {
+ LOG.debug("Catalog {} does not support function operations",
catalogName);
+ }
+ this.functionCatalog = fc;
} catch (NoSuchCatalogException e) {
throw new TrinoException(
GravitinoErrorCode.GRAVITINO_CATALOG_NOT_EXISTS,
CATALOG_DOES_NOT_EXIST_MSG, e);
@@ -396,4 +411,47 @@ public class CatalogConnectorMetadata {
String[] columnNames = {columnName};
applyAlter(schemaTableName, TableChange.updateColumnType(columnNames,
type));
}
+
+ /**
+ * Checks whether the catalog supports function operations.
+ *
+ * @return true if the catalog supports function operations, false otherwise
+ */
+ public boolean supportsFunctions() {
+ return functionCatalog != null;
+ }
+
+ /**
+ * Lists all functions with details in the specified schema.
+ *
+ * @param schemaName the name of the schema
+ * @return an array of functions
+ * @throws UnsupportedOperationException if the catalog does not support
functions
+ */
+ public Function[] listFunctionInfos(String schemaName) {
+ if (!supportsFunctions()) {
+ throw new UnsupportedOperationException("Catalog does not support
functions");
+ }
+ try {
+ return functionCatalog.listFunctionInfos(Namespace.of(schemaName));
+ } catch (NoSuchSchemaException e) {
+ throw new TrinoException(
+ GravitinoErrorCode.GRAVITINO_SCHEMA_NOT_EXISTS,
SCHEMA_DOES_NOT_EXIST_MSG, e);
+ }
+ }
+
+ /**
+ * Retrieves a function by its schema and function name.
+ *
+ * @param schemaName the name of the schema
+ * @param functionName the name of the function
+ * @return the function
+ * @throws UnsupportedOperationException if the catalog does not support
functions
+ */
+ public Function getFunction(String schemaName, String functionName) {
+ if (!supportsFunctions()) {
+ throw new UnsupportedOperationException("Catalog does not support
functions");
+ }
+ return functionCatalog.getFunction(NameIdentifier.of(schemaName,
functionName));
+ }
}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataFunction.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataFunction.java
new file mode 100644
index 0000000000..bc7cfa0568
--- /dev/null
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataFunction.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.function.LanguageFunction;
+import io.trino.spi.function.SchemaFunctionName;
+import java.util.Collection;
+import java.util.List;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionImpl;
+import org.apache.gravitino.function.FunctionImpls;
+import org.apache.gravitino.function.FunctionParam;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
+import
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
+import org.apache.gravitino.trino.connector.util.GeneralDataTypeTransformer;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoMetadataFunction {
+
+ @Test
+ public void testListLanguageFunctionsReturnsTrinoSqlFunctions() {
+ Function function =
+ createMockFunction("my_func", "RETURN x + 1",
FunctionImpl.RuntimeType.TRINO);
+ CatalogConnectorMetadata catalogMetadata =
mock(CatalogConnectorMetadata.class);
+ when(catalogMetadata.supportsFunctions()).thenReturn(true);
+ when(catalogMetadata.listFunctionInfos("test_schema")).thenReturn(new
Function[] {function});
+
+ GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ Collection<LanguageFunction> functions =
metadata.listLanguageFunctions(session, "test_schema");
+ assertEquals(1, functions.size());
+
+ LanguageFunction langFunc = functions.iterator().next();
+ assertEquals("RETURN x + 1", langFunc.sql());
+ assertEquals("my_func(integer)", langFunc.signatureToken());
+ }
+
+ @Test
+ public void testGetLanguageFunctionsReturnsTrinoSqlFunctions() {
+ Function function =
+ createMockFunction("my_func", "RETURN x + 1",
FunctionImpl.RuntimeType.TRINO);
+ CatalogConnectorMetadata catalogMetadata =
mock(CatalogConnectorMetadata.class);
+ when(catalogMetadata.supportsFunctions()).thenReturn(true);
+ when(catalogMetadata.getFunction("test_schema",
"my_func")).thenReturn(function);
+
+ GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ Collection<LanguageFunction> functions =
+ metadata.getLanguageFunctions(session, new
SchemaFunctionName("test_schema", "my_func"));
+ assertEquals(1, functions.size());
+
+ LanguageFunction langFunc = functions.iterator().next();
+ assertEquals("RETURN x + 1", langFunc.sql());
+ }
+
+ @Test
+ public void testListLanguageFunctionsFiltersNonTrinoRuntime() {
+ Function sparkFunction =
+ createMockFunction("spark_func", "RETURN 1",
FunctionImpl.RuntimeType.SPARK);
+ Function trinoFunction =
+ createMockFunction("trino_func", "RETURN 2",
FunctionImpl.RuntimeType.TRINO);
+
+ CatalogConnectorMetadata catalogMetadata =
mock(CatalogConnectorMetadata.class);
+ when(catalogMetadata.supportsFunctions()).thenReturn(true);
+ when(catalogMetadata.listFunctionInfos("test_schema"))
+ .thenReturn(new Function[] {sparkFunction, trinoFunction});
+
+ GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ Collection<LanguageFunction> functions =
metadata.listLanguageFunctions(session, "test_schema");
+ assertEquals(1, functions.size());
+ assertEquals("RETURN 2", functions.iterator().next().sql());
+ }
+
+ @Test
+ public void testListLanguageFunctionsWhenUnsupported() {
+ CatalogConnectorMetadata catalogMetadata =
mock(CatalogConnectorMetadata.class);
+ when(catalogMetadata.supportsFunctions()).thenReturn(false);
+
+ GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ Collection<LanguageFunction> functions =
metadata.listLanguageFunctions(session, "test_schema");
+ assertTrue(functions.isEmpty());
+ }
+
+ @Test
+ public void testGetLanguageFunctionsWhenFunctionNotFound() {
+ CatalogConnectorMetadata catalogMetadata =
mock(CatalogConnectorMetadata.class);
+ when(catalogMetadata.supportsFunctions()).thenReturn(true);
+ when(catalogMetadata.getFunction("test_schema", "no_such_func"))
+ .thenThrow(new NoSuchFunctionException("Function not found"));
+
+ GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ Collection<LanguageFunction> functions =
+ metadata.getLanguageFunctions(
+ session, new SchemaFunctionName("test_schema", "no_such_func"));
+ assertTrue(functions.isEmpty());
+ }
+
+ @Test
+ public void testGetLanguageFunctionsWhenUnsupported() {
+ CatalogConnectorMetadata catalogMetadata =
mock(CatalogConnectorMetadata.class);
+ when(catalogMetadata.supportsFunctions()).thenReturn(false);
+
+ GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ Collection<LanguageFunction> functions =
+ metadata.getLanguageFunctions(session, new
SchemaFunctionName("test_schema", "my_func"));
+ assertTrue(functions.isEmpty());
+ }
+
+ @Test
+ public void testMultipleDefinitionsAndImpls() {
+ FunctionParam param1 = createMockParam("x", Types.IntegerType.get());
+ FunctionParam param2 = createMockParam("x", Types.StringType.get());
+
+ FunctionImpl trinoSqlImpl1 =
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, "RETURN x + 1");
+ FunctionImpl trinoSqlImpl2 =
+ FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO, "RETURN
length(x)");
+ FunctionImpl sparkImpl =
FunctionImpls.ofSql(FunctionImpl.RuntimeType.SPARK, "RETURN x * 2");
+
+ FunctionDefinition def1 = createMockDefinition(new FunctionParam[]
{param1}, trinoSqlImpl1);
+ FunctionDefinition def2 =
+ createMockDefinition(new FunctionParam[] {param2}, trinoSqlImpl2,
sparkImpl);
+
+ Function function = createMockFunctionWithDefinitions("multi_func", def1,
def2);
+
+ CatalogConnectorMetadata catalogMetadata =
mock(CatalogConnectorMetadata.class);
+ when(catalogMetadata.supportsFunctions()).thenReturn(true);
+ when(catalogMetadata.listFunctionInfos("test_schema")).thenReturn(new
Function[] {function});
+
+ GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ Collection<LanguageFunction> functions =
metadata.listLanguageFunctions(session, "test_schema");
+ // Should have 2 Trino SQL impls (one from def1, one from def2; sparkImpl
filtered out)
+ assertEquals(2, functions.size());
+
+ List<String> sqlBodies =
functions.stream().map(LanguageFunction::sql).sorted().toList();
+ assertEquals("RETURN length(x)", sqlBodies.get(0));
+ assertEquals("RETURN x + 1", sqlBodies.get(1));
+ }
+
+ @Test
+ public void testSignatureTokenFormat() {
+ FunctionParam param1 = createMockParam("a", Types.IntegerType.get());
+ FunctionParam param2 = createMockParam("b", Types.StringType.get());
+
+ FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO,
"RETURN a");
+ FunctionDefinition def = createMockDefinition(new FunctionParam[] {param1,
param2}, impl);
+ Function function = createMockFunctionWithDefinitions("test_func", def);
+
+ CatalogConnectorMetadata catalogMetadata =
mock(CatalogConnectorMetadata.class);
+ when(catalogMetadata.supportsFunctions()).thenReturn(true);
+ when(catalogMetadata.getFunction("s", "test_func")).thenReturn(function);
+
+ GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ Collection<LanguageFunction> functions =
+ metadata.getLanguageFunctions(session, new SchemaFunctionName("s",
"test_func"));
+ assertEquals(1, functions.size());
+ assertEquals("test_func(integer,varchar)",
functions.iterator().next().signatureToken());
+ }
+
+ @Test
+ public void testNoArgsFunction() {
+ FunctionImpl impl = FunctionImpls.ofSql(FunctionImpl.RuntimeType.TRINO,
"RETURN 42");
+ FunctionDefinition def = createMockDefinition(new FunctionParam[] {},
impl);
+ Function function = createMockFunctionWithDefinitions("const_func", def);
+
+ CatalogConnectorMetadata catalogMetadata =
mock(CatalogConnectorMetadata.class);
+ when(catalogMetadata.supportsFunctions()).thenReturn(true);
+ when(catalogMetadata.getFunction("s", "const_func")).thenReturn(function);
+
+ GravitinoMetadata metadata = createTestMetadata(catalogMetadata);
+ ConnectorSession session = mock(ConnectorSession.class);
+
+ Collection<LanguageFunction> functions =
+ metadata.getLanguageFunctions(session, new SchemaFunctionName("s",
"const_func"));
+ assertEquals(1, functions.size());
+ LanguageFunction lf = functions.iterator().next();
+ assertEquals("const_func()", lf.signatureToken());
+ assertEquals("RETURN 42", lf.sql());
+ }
+
+ private GravitinoMetadata createTestMetadata(CatalogConnectorMetadata
catalogMetadata) {
+ CatalogConnectorMetadataAdapter metadataAdapter =
mock(CatalogConnectorMetadataAdapter.class);
+ when(metadataAdapter.getDataTypeTransformer()).thenReturn(new
GeneralDataTypeTransformer());
+ ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class);
+ // Use a concrete anonymous subclass since GravitinoMetadata is abstract
+ return new GravitinoMetadata(catalogMetadata, metadataAdapter,
internalMetadata) {
+ // No need to implement abstract methods for these tests since they are
version-specific
+ };
+ }
+
+ private Function createMockFunction(String name, String sql,
FunctionImpl.RuntimeType runtime) {
+ FunctionParam param = createMockParam("x", Types.IntegerType.get());
+ FunctionImpl impl = FunctionImpls.ofSql(runtime, sql);
+ FunctionDefinition definition = createMockDefinition(new FunctionParam[]
{param}, impl);
+ return createMockFunctionWithDefinitions(name, definition);
+ }
+
+ private Function createMockFunctionWithDefinitions(
+ String name, FunctionDefinition... definitions) {
+ Function function = mock(Function.class);
+ when(function.name()).thenReturn(name);
+ when(function.functionType()).thenReturn(FunctionType.SCALAR);
+ when(function.deterministic()).thenReturn(true);
+ when(function.definitions()).thenReturn(definitions);
+ Audit audit = mock(Audit.class);
+ when(function.auditInfo()).thenReturn(audit);
+ return function;
+ }
+
+ private FunctionDefinition createMockDefinition(FunctionParam[] params,
FunctionImpl... impls) {
+ FunctionDefinition definition = mock(FunctionDefinition.class);
+ when(definition.parameters()).thenReturn(params);
+ when(definition.impls()).thenReturn(impls);
+ return definition;
+ }
+
+ private FunctionParam createMockParam(String name,
org.apache.gravitino.rel.types.Type type) {
+ FunctionParam param = mock(FunctionParam.class);
+ when(param.name()).thenReturn(name);
+ when(param.dataType()).thenReturn(type);
+ return param;
+ }
+}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorMetadataFunction.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorMetadataFunction.java
new file mode 100644
index 0000000000..2a9a20a075
--- /dev/null
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorMetadataFunction.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchFunctionException;
+import org.apache.gravitino.function.Function;
+import org.apache.gravitino.function.FunctionCatalog;
+import org.apache.gravitino.function.FunctionDefinition;
+import org.apache.gravitino.function.FunctionType;
+import org.apache.gravitino.rel.TableCatalog;
+import org.junit.jupiter.api.Test;
+
+public class TestCatalogConnectorMetadataFunction {
+
+ @Test
+ public void testSupportsFunctionsWhenCatalogSupports() {
+ CatalogConnectorMetadata metadata =
createMetadataWithFunctionCatalog(true);
+ assertTrue(metadata.supportsFunctions());
+ }
+
+ @Test
+ public void testSupportsFunctionsWhenCatalogDoesNotSupport() {
+ CatalogConnectorMetadata metadata =
createMetadataWithFunctionCatalog(false);
+ assertFalse(metadata.supportsFunctions());
+ }
+
+ @Test
+ public void testListFunctionInfosReturnsFunctions() {
+ Function mockFunction = createMockFunction("test_func");
+ FunctionCatalog functionCatalog = mock(FunctionCatalog.class);
+ when(functionCatalog.listFunctionInfos(any(Namespace.class)))
+ .thenReturn(new Function[] {mockFunction});
+
+ CatalogConnectorMetadata metadata =
createMetadataWithMockFunctionCatalog(functionCatalog);
+ Function[] functions = metadata.listFunctionInfos("test_schema");
+ assertEquals(1, functions.length);
+ assertEquals("test_func", functions[0].name());
+ }
+
+ @Test
+ public void testListFunctionInfosThrowsWhenUnsupported() {
+ CatalogConnectorMetadata metadata =
createMetadataWithFunctionCatalog(false);
+ assertThrows(
+ UnsupportedOperationException.class, () ->
metadata.listFunctionInfos("test_schema"));
+ }
+
+ @Test
+ public void testGetFunctionReturnsFunction() {
+ Function mockFunction = createMockFunction("my_func");
+ FunctionCatalog functionCatalog = mock(FunctionCatalog.class);
+
when(functionCatalog.getFunction(any(NameIdentifier.class))).thenReturn(mockFunction);
+
+ CatalogConnectorMetadata metadata =
createMetadataWithMockFunctionCatalog(functionCatalog);
+ Function function = metadata.getFunction("test_schema", "my_func");
+ assertNotNull(function);
+ assertEquals("my_func", function.name());
+ }
+
+ @Test
+ public void testGetFunctionThrowsWhenNotFound() {
+ FunctionCatalog functionCatalog = mock(FunctionCatalog.class);
+ when(functionCatalog.getFunction(any(NameIdentifier.class)))
+ .thenThrow(new NoSuchFunctionException("Function does not exist"));
+
+ CatalogConnectorMetadata metadata =
createMetadataWithMockFunctionCatalog(functionCatalog);
+ assertThrows(
+ NoSuchFunctionException.class, () ->
metadata.getFunction("test_schema", "no_such_func"));
+ }
+
+ @Test
+ public void testGetFunctionThrowsWhenUnsupported() {
+ CatalogConnectorMetadata metadata =
createMetadataWithFunctionCatalog(false);
+ assertThrows(
+ UnsupportedOperationException.class, () ->
metadata.getFunction("test_schema", "my_func"));
+ }
+
+ private CatalogConnectorMetadata createMetadataWithFunctionCatalog(boolean
supportsFunctions) {
+ GravitinoMetalake metalake = mock(GravitinoMetalake.class);
+ Catalog catalog = mock(Catalog.class);
+ when(catalog.name()).thenReturn("test_catalog");
+ when(metalake.loadCatalog(anyString())).thenReturn(catalog);
+ when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class));
+ when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class));
+ if (supportsFunctions) {
+
when(catalog.asFunctionCatalog()).thenReturn(mock(FunctionCatalog.class));
+ } else {
+ when(catalog.asFunctionCatalog())
+ .thenThrow(new UnsupportedOperationException("Not supported"));
+ }
+ return new CatalogConnectorMetadata(metalake,
NameIdentifier.of("metalake", "test_catalog"));
+ }
+
+ private CatalogConnectorMetadata createMetadataWithMockFunctionCatalog(
+ FunctionCatalog functionCatalog) {
+ GravitinoMetalake metalake = mock(GravitinoMetalake.class);
+ Catalog catalog = mock(Catalog.class);
+ when(catalog.name()).thenReturn("test_catalog");
+ when(metalake.loadCatalog(anyString())).thenReturn(catalog);
+ when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class));
+ when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class));
+ when(catalog.asFunctionCatalog()).thenReturn(functionCatalog);
+ return new CatalogConnectorMetadata(metalake,
NameIdentifier.of("metalake", "test_catalog"));
+ }
+
+ private Function createMockFunction(String name) {
+ Function function = mock(Function.class);
+ when(function.name()).thenReturn(name);
+ when(function.functionType()).thenReturn(FunctionType.SCALAR);
+ when(function.deterministic()).thenReturn(true);
+ when(function.definitions()).thenReturn(new FunctionDefinition[0]);
+ Audit audit = mock(Audit.class);
+ when(audit.creator()).thenReturn("test");
+ when(audit.createTime()).thenReturn(Instant.now());
+ when(function.auditInfo()).thenReturn(audit);
+ return function;
+ }
+}