This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f7cfeb534d92 [SPARK-49152][SQL][FOLLOWUP] DelegatingCatalogExtension 
should also use V1 commands
f7cfeb534d92 is described below

commit f7cfeb534d9285df381d147e01de47ec439c082e
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Sep 5 21:02:20 2024 +0800

    [SPARK-49152][SQL][FOLLOWUP] DelegatingCatalogExtension should also use V1 
commands
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/47660 . If users 
override `spark_catalog` with
    `DelegatingCatalogExtension`, we should still use v1 commands as 
`DelegatingCatalogExtension` forwards requests to HMS and there are still 
behavior differences between v1 and v2 commands targeting HMS.
    
    This PR also forces to use v1 commands for certain commands that do not 
have a v2 version.
    
    ### Why are the changes needed?
    
    Avoid introducing behavior changes to Spark plugins that implements 
`DelegatingCatalogExtension` to override `spark_catalog`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    new test case
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47995 from amaliujia/fix_catalog_v2.
    
    Lead-authored-by: Wenchen Fan <[email protected]>
    Co-authored-by: Rui Wang <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 41 ++++++++++++++++------
 .../DataSourceV2SQLSessionCatalogSuite.scala       |  8 +++++
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 16 ++++++---
 3 files changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index d569f1ed484c..02ad2e79a564 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, 
ResolveDefaultColumns => DefaultCols}
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
CatalogV2Util, DelegatingCatalogExtension, LookupCatalog, SupportsNamespaces, 
V1Table}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.command._
@@ -284,10 +284,20 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
     case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, 
allColumns) =>
       AnalyzeColumnCommand(ident, columnNames, allColumns)
 
-    case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, 
dropPartitions) =>
+    // V2 catalog doesn't support REPAIR TABLE yet, we must use v1 command 
here.
+    case RepairTable(
+        ResolvedV1TableIdentifierInSessionCatalog(ident),
+        addPartitions,
+        dropPartitions) =>
       RepairTableCommand(ident, addPartitions, dropPartitions)
 
-    case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, 
isOverwrite, partition) =>
+    // V2 catalog doesn't support LOAD DATA yet, we must use v1 command here.
+    case LoadData(
+        ResolvedV1TableIdentifierInSessionCatalog(ident),
+        path,
+        isLocal,
+        isOverwrite,
+        partition) =>
       LoadDataCommand(
         ident,
         path,
@@ -336,7 +346,8 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
       }
       ShowColumnsCommand(db, v1TableName, output)
 
-    case RecoverPartitions(ResolvedV1TableIdentifier(ident)) =>
+    // V2 catalog doesn't support RECOVER PARTITIONS yet, we must use v1 
command here.
+    case RecoverPartitions(ResolvedV1TableIdentifierInSessionCatalog(ident)) =>
       RepairTableCommand(
         ident,
         enableAddPartitions = true,
@@ -364,8 +375,9 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
         purge,
         retainData = false)
 
+    // V2 catalog doesn't support setting serde properties yet, we must use v1 
command here.
     case SetTableSerDeProperties(
-        ResolvedV1TableIdentifier(ident),
+        ResolvedV1TableIdentifierInSessionCatalog(ident),
         serdeClassName,
         serdeProperties,
         partitionSpec) =>
@@ -380,10 +392,10 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
 
     // V2 catalog doesn't support setting partition location yet, we must use 
v1 command here.
     case SetTableLocation(
-        ResolvedTable(catalog, _, t: V1Table, _),
+        ResolvedV1TableIdentifierInSessionCatalog(ident),
         Some(partitionSpec),
-        location) if isSessionCatalog(catalog) =>
-      AlterTableSetLocationCommand(t.v1Table.identifier, Some(partitionSpec), 
location)
+        location) =>
+      AlterTableSetLocationCommand(ident, Some(partitionSpec), location)
 
     case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
       AlterViewAsCommand(ident, originalText, query)
@@ -600,6 +612,14 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
     }
   }
 
+  object ResolvedV1TableIdentifierInSessionCatalog {
+    def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved 
match {
+      case ResolvedTable(catalog, _, t: V1Table, _) if 
isSessionCatalog(catalog) =>
+        Some(t.catalogTable.identifier)
+      case _ => None
+    }
+  }
+
   object ResolvedV1TableOrViewIdentifier {
     def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved 
match {
       case ResolvedV1TableIdentifier(ident) => Some(ident)
@@ -684,7 +704,8 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
   }
 
   private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
-    isSessionCatalog(catalog) &&
-      SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty
+    isSessionCatalog(catalog) && (
+      SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty ||
+        catalog.isInstanceOf[DelegatingCatalogExtension])
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
index 95624f3f61c5..7463eb34d17f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala
@@ -71,4 +71,12 @@ class DataSourceV2SQLSessionCatalogSuite
       sql(s"CREATE EXTERNAL TABLE t (i INT) USING $v2Format 
TBLPROPERTIES($prop)")
     }
   }
+
+  test("SPARK-49152: partition columns should be put at the end") {
+    withTable("t") {
+      sql("CREATE TABLE t (c1 INT, c2 INT) USING json PARTITIONED BY (c1)")
+      // partition columns should be put at the end.
+      assert(getTableMetadata("default.t").columns().map(_.name()) === 
Seq("c2", "c1"))
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 1d37c6aa4eb7..922bf01b541a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2125,10 +2125,18 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("REPLACE TABLE: v1 table") {
-    sql(s"CREATE OR REPLACE TABLE tbl (a int) USING 
${classOf[SimpleScanSource].getName}")
-    val v2Catalog = catalog("spark_catalog").asTableCatalog
-    val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl"))
-    assert(table.properties().get(TableCatalog.PROP_PROVIDER) == 
classOf[SimpleScanSource].getName)
+    val e = intercept[AnalysisException] {
+      sql(s"CREATE OR REPLACE TABLE tbl (a int) USING 
${classOf[SimpleScanSource].getName}")
+    }
+    checkError(
+      exception = e,
+      errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+      sqlState = "0A000",
+      parameters = Map(
+        "tableName" -> "`spark_catalog`.`default`.`tbl`",
+        "operation" -> "REPLACE TABLE"
+      )
+    )
   }
 
   test("DeleteFrom: - delete with invalid predicate") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to