This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 3f22ef172173 [SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable
should not be changed by falling back to v1 command
3f22ef172173 is described below
commit 3f22ef1721738ebacba8a27854ea8f24e0c6e5b9
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Sep 9 10:45:14 2024 +0800
[SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable should not be
changed by falling back to v1 command
This is a followup of https://github.com/apache/spark/pull/47772 . The
behavior of SaveAsTable should not be changed by switching v1 to v2 command.
This is similar to https://github.com/apache/spark/pull/47995. For the case of
`DelegatingCatalogExtension` we need it goes to V1 commands to be consistent
with previous behavior.
Behavior regression.
No
UT
No
Closes #48019 from amaliujia/regress_v2.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Rui Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 37b39b41d07cf8f39dd54cc18342e4d7b8bc71a3)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 6 ++++--
.../DataSourceV2DataFrameSessionCatalogSuite.scala | 12 ++++++++++--
.../spark/sql/connector/TestV2SessionCatalogBase.scala | 15 +++++----------
3 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 84f02c723136..8c945ef0dbcb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -565,8 +565,10 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val session = df.sparkSession
- val canUseV2 = lookupV2Provider().isDefined ||
-
df.sparkSession.sessionState.conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined
+ val canUseV2 = lookupV2Provider().isDefined ||
(df.sparkSession.sessionState.conf.getConf(
+ SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined &&
+
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
+ .isInstanceOf[DelegatingCatalogExtension])
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index 79fbabbeacaa..9dd20c906535 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -55,8 +55,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
"and a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
- val format = spark.sessionState.conf.defaultDataSourceName
- sql(s"CREATE TABLE same_name(id LONG) USING $format")
+ sql(s"CREATE TABLE same_name(id LONG) USING $v2Format")
spark.range(10).createTempView("same_name")
spark.range(20).write.format(v2Format).mode(SaveMode.Append).saveAsTable("same_name")
checkAnswer(spark.table("same_name"), spark.range(10).toDF())
@@ -88,6 +87,15 @@ class DataSourceV2DataFrameSessionCatalogSuite
assert(tableInfo.properties().get("provider") === v2Format)
}
}
+
+ test("SPARK-49246: saveAsTable with v1 format") {
+ withTable("t") {
+ sql("CREATE TABLE t(c INT) USING csv")
+ val df = spark.range(10).toDF()
+ df.write.mode(SaveMode.Overwrite).format("csv").saveAsTable("t")
+ verifyTable("t", df)
+ }
+ }
}
class InMemoryTableSessionCatalog extends
TestV2SessionCatalogBase[InMemoryTable] {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
index 9144fb939045..bd13123d587f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
@@ -22,8 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
DelegatingCatalogExtension, Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
@@ -53,14 +52,10 @@ private[connector] trait TestV2SessionCatalogBase[T <:
Table] extends Delegating
if (tables.containsKey(ident)) {
tables.get(ident)
} else {
- // Table was created through the built-in catalog
- super.loadTable(ident) match {
- case v1Table: V1Table if v1Table.v1Table.tableType ==
CatalogTableType.VIEW => v1Table
- case t =>
- val table = newTable(t.name(), t.schema(), t.partitioning(),
t.properties())
- addTable(ident, table)
- table
- }
+ // Table was created through the built-in catalog via v1 command, this
is OK as the
+ // `loadTable` should always be invoked, and we set the `tableCreated`
to pass validation.
+ tableCreated.set(true)
+ super.loadTable(ident)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]