This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 27987536be38 [SPARK-47800][SQL] Create new method for identifier to
tableIdentifier conversion
27987536be38 is described below
commit 27987536be3810c3e61767d7abd33f3886411c5c
Author: Uros Stankovic <[email protected]>
AuthorDate: Fri Apr 12 14:48:43 2024 +0800
[SPARK-47800][SQL] Create new method for identifier to tableIdentifier
conversion
Introducing new method for converting catalog identifier (since Spark
3.0.0, newer API) to table identifier (older API)
### Why are the changes needed?
Code is cleaner and DataSourceV2Strategy is not responsible for conversion.
New method can be reused also.
Conversion is a little bit improved also (schema is not required anymore,
it can miss)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No test made, since it is minor refactoring
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45985 from
urosstan-db/SPARK-47800-v2-Identifier-to-table-identifier-method.
Authored-by: Uros Stankovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/connector/catalog/CatalogV2Implicits.scala | 20 ++++++++++++++++++++
.../datasources/v2/DataSourceV2Strategy.scala | 9 ++-------
2 files changed, 22 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index bf4cd2eedc83..65bdae85be12 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -164,6 +164,26 @@ private[sql] object CatalogV2Implicits {
case _ => throw
QueryCompilationErrors.identifierTooManyNamePartsError(original)
}
+ /**
+ * Tries to convert catalog identifier to the table identifier. Table
identifier does not
+ * support multiple namespaces (nested namespaces), so if identifier
contains nested namespace,
+ * conversion cannot be done
+ * @param catalogName Catalog name. Identifier represents just one object
in catalog, so it has
+ * no catalog name needed for table identifier creation
+ * @return Table identifier if conversion can be done, None otherwise
+ */
+ def asTableIdentifierOpt(catalogName: Option[String]):
Option[TableIdentifier] = {
+ ident.namespace().toImmutableArraySeq match {
+ case Seq(singleNamespace) =>
+ Some(TableIdentifier(ident.name(), Some(singleNamespace),
catalogName))
+ case Seq() =>
+ // If namespace is not given, catalog will not be used
+ Some(TableIdentifier(ident.name()))
+ case _ =>
+ None
+ }
+ }
+
def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match {
case ns if ns.isEmpty => FunctionIdentifier(ident.name())
case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index e7960f8b61ae..828d737f93fa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -24,7 +24,6 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier,
ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions
@@ -118,12 +117,8 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
val rdd = v1Relation.buildScan()
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output,
rdd)
- val tableIdentifier = v2Relation.identifier.map(_.asMultipartIdentifier)
match {
- case Some(Seq(schema, tableName)) =>
- Some(new TableIdentifier(tableName, Some(schema),
v2Relation.catalog.map(_.name())))
- case _ =>
- None
- }
+ val catalogName = v2Relation.catalog.map(_.name())
+ val tableIdentifier =
v2Relation.identifier.flatMap(_.asTableIdentifierOpt(catalogName))
val dsScan = RowDataSourceScanExec(
output,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]