This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dfd7cde91c5d [SPARK-45842][SQL] Refactor Catalog Function APIs to use
analyzer
dfd7cde91c5d is described below
commit dfd7cde91c5d6f034a11ea492be83afaf771ceb6
Author: Yihong He <[email protected]>
AuthorDate: Wed Nov 8 18:22:40 2023 -0800
[SPARK-45842][SQL] Refactor Catalog Function APIs to use analyzer
### What changes were proposed in this pull request?
- Refactor Catalog Function APIs to use analyzer
### Why are the changes needed?
- Less duplicate logics. We should not directly invoke catalog APIs, but go
through analyzer.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43720 from heyihong/SPARK-45842.
Authored-by: Yihong He <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/internal/CatalogImpl.scala | 59 +++++++++++++---------
1 file changed, 35 insertions(+), 24 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 5650e9d2399c..b1ad454fc041 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -22,14 +22,14 @@ import scala.util.control.NonFatal
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column,
Database, Function, Table}
-import org.apache.spark.sql.catalyst.{DefinedByConstructorParams,
FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{DefinedByConstructorParams,
TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable,
LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions,
ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin,
CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table =>
V2Table, TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin,
CatalogV2Util, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog,
V1Table}
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper,
MultipartIdentifierHelper, NamespaceHelper, TransformHelper}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.ShowTablesCommand
@@ -284,6 +284,33 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
CatalogImpl.makeDataset(functions.result(), sparkSession)
}
+ private def toFunctionIdent(functionName: String): Seq[String] = {
+ val parsed = parseIdent(functionName)
+ // For backward compatibility (Spark 3.3 and prior), we should check if
the function exists in
+ // the Hive Metastore first.
+ if (parsed.length <= 2 &&
+ !sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) &&
+ sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) {
+ qualifyV1Ident(parsed)
+ } else {
+ parsed
+ }
+ }
+
+ private def functionExists(ident: Seq[String]): Boolean = {
+ val plan =
+ UnresolvedFunctionName(ident, CatalogImpl.FUNCTION_EXISTS_COMMAND_NAME,
false, None)
+ try {
+ sparkSession.sessionState.executePlan(plan).analyzed match {
+ case _: ResolvedPersistentFunc => true
+ case _: ResolvedNonPersistentFunc => true
+ case _ => false
+ }
+ } catch {
+ case e: AnalysisException if e.getErrorClass == "UNRESOLVED_ROUTINE" =>
false
+ }
+ }
+
private def makeFunction(ident: Seq[String]): Function = {
val plan = UnresolvedFunctionName(ident, "Catalog.makeFunction", false,
None)
sparkSession.sessionState.executePlan(plan).analyzed match {
@@ -465,17 +492,7 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
* function. This throws an `AnalysisException` when no `Function` can be
found.
*/
override def getFunction(functionName: String): Function = {
- val parsed = parseIdent(functionName)
- // For backward compatibility (Spark 3.3 and prior), we should check if
the function exists in
- // the Hive Metastore first.
- val nameParts = if (parsed.length <= 2 &&
- !sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) &&
- sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) {
- qualifyV1Ident(parsed)
- } else {
- parsed
- }
- makeFunction(nameParts)
+ makeFunction(toFunctionIdent(functionName))
}
/**
@@ -540,23 +557,16 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
* or a function.
*/
override def functionExists(functionName: String): Boolean = {
- val parsed = parseIdent(functionName)
- // For backward compatibility (Spark 3.3 and prior), we should check if
the function exists in
- // the Hive Metastore first. This also checks if it's a built-in/temp
function.
- (parsed.length <= 2 &&
sessionCatalog.functionExists(parsed.asFunctionIdentifier)) || {
- val plan = UnresolvedIdentifier(parsed)
- sparkSession.sessionState.executePlan(plan).analyzed match {
- case ResolvedIdentifier(catalog: FunctionCatalog, ident) =>
catalog.functionExists(ident)
- case _ => false
- }
- }
+ functionExists(toFunctionIdent(functionName))
}
/**
* Checks if the function with the specified name exists in the specified
database.
*/
override def functionExists(dbName: String, functionName: String): Boolean =
{
- sessionCatalog.functionExists(FunctionIdentifier(functionName,
Option(dbName)))
+ // For backward compatibility (Spark 3.3 and prior), here we always look
up the function from
+ // the Hive Metastore.
+ functionExists(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName,
functionName))
}
/**
@@ -942,4 +952,5 @@ private[sql] object CatalogImpl {
new Dataset[T](queryExecution, enc)
}
+ private val FUNCTION_EXISTS_COMMAND_NAME = "Catalog.functionExists"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]