This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 3f22ef172173 [SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable 
should not be changed by falling back to v1 command
3f22ef172173 is described below

commit 3f22ef1721738ebacba8a27854ea8f24e0c6e5b9
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Sep 9 10:45:14 2024 +0800

    [SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable should not be 
changed by falling back to v1 command
    
    This is a followup of https://github.com/apache/spark/pull/47772 . The 
behavior of SaveAsTable should not be changed by switching v1 to v2 command. 
This is similar to https://github.com/apache/spark/pull/47995. For the case of 
`DelegatingCatalogExtension` we need it goes to V1 commands to be consistent 
with previous behavior.
    
    Behavior regression.
    
    No
    
    UT
    
    No
    
    Closes #48019 from amaliujia/regress_v2.
    
    Lead-authored-by: Wenchen Fan <[email protected]>
    Co-authored-by: Rui Wang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 37b39b41d07cf8f39dd54cc18342e4d7b8bc71a3)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../main/scala/org/apache/spark/sql/DataFrameWriter.scala |  6 ++++--
 .../DataSourceV2DataFrameSessionCatalogSuite.scala        | 12 ++++++++++--
 .../spark/sql/connector/TestV2SessionCatalogBase.scala    | 15 +++++----------
 3 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 84f02c723136..8c945ef0dbcb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -565,8 +565,10 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
     val session = df.sparkSession
-    val canUseV2 = lookupV2Provider().isDefined ||
-      
df.sparkSession.sessionState.conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined
+    val canUseV2 = lookupV2Provider().isDefined || 
(df.sparkSession.sessionState.conf.getConf(
+        SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined &&
+        
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
+          .isInstanceOf[DelegatingCatalogExtension])
 
     session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
       case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index 79fbabbeacaa..9dd20c906535 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -55,8 +55,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
     "and a same-name temp view exist") {
     withTable("same_name") {
       withTempView("same_name") {
-        val format = spark.sessionState.conf.defaultDataSourceName
-        sql(s"CREATE TABLE same_name(id LONG) USING $format")
+        sql(s"CREATE TABLE same_name(id LONG) USING $v2Format")
         spark.range(10).createTempView("same_name")
         
spark.range(20).write.format(v2Format).mode(SaveMode.Append).saveAsTable("same_name")
         checkAnswer(spark.table("same_name"), spark.range(10).toDF())
@@ -88,6 +87,15 @@ class DataSourceV2DataFrameSessionCatalogSuite
       assert(tableInfo.properties().get("provider") === v2Format)
     }
   }
+
+  test("SPARK-49246: saveAsTable with v1 format") {
+    withTable("t") {
+      sql("CREATE TABLE t(c INT) USING csv")
+      val df = spark.range(10).toDF()
+      df.write.mode(SaveMode.Overwrite).format("csv").saveAsTable("t")
+      verifyTable("t", df)
+    }
+  }
 }
 
 class InMemoryTableSessionCatalog extends 
TestV2SessionCatalogBase[InMemoryTable] {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
index 9144fb939045..bd13123d587f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
@@ -22,8 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
DelegatingCatalogExtension, Identifier, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.types.StructType
 
@@ -53,14 +52,10 @@ private[connector] trait TestV2SessionCatalogBase[T <: 
Table] extends Delegating
     if (tables.containsKey(ident)) {
       tables.get(ident)
     } else {
-      // Table was created through the built-in catalog
-      super.loadTable(ident) match {
-        case v1Table: V1Table if v1Table.v1Table.tableType == 
CatalogTableType.VIEW => v1Table
-        case t =>
-          val table = newTable(t.name(), t.schema(), t.partitioning(), 
t.properties())
-          addTable(ident, table)
-          table
-      }
+      // Table was created through the built-in catalog via v1 command, this 
is OK as the
+      // `loadTable` should always be invoked, and we set the `tableCreated` 
to pass validation.
+      tableCreated.set(true)
+      super.loadTable(ident)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to