This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 37b39b41d07c [SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable
should not be changed by falling back to v1 command
37b39b41d07c is described below
commit 37b39b41d07cf8f39dd54cc18342e4d7b8bc71a3
Author: Wenchen Fan <[email protected]>
AuthorDate: Mon Sep 9 10:45:14 2024 +0800
[SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable should not be
changed by falling back to v1 command
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/47772 . The
behavior of SaveAsTable should not be changed by switching v1 to v2 command.
This is similar to https://github.com/apache/spark/pull/47995. For the case of
`DelegatingCatalogExtension` we need it goes to V1 commands to be consistent
with previous behavior.
### Why are the changes needed?
Behavior regression.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48019 from amaliujia/regress_v2.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Rui Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/internal/DataFrameWriterImpl.scala | 6 ++++--
.../test/scala/org/apache/spark/sql/CollationSuite.scala | 3 ---
.../DataSourceV2DataFrameSessionCatalogSuite.scala | 12 ++++++++++--
.../spark/sql/connector/TestV2SessionCatalogBase.scala | 15 +++++----------
4 files changed, 19 insertions(+), 17 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
index 7248a2d3f056..f0eef9ae1cbb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
@@ -426,8 +426,10 @@ final class DataFrameWriterImpl[T] private[sql](ds:
Dataset[T]) extends DataFram
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val session = df.sparkSession
- val canUseV2 = lookupV2Provider().isDefined ||
-
df.sparkSession.sessionState.conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined
+ val canUseV2 = lookupV2Provider().isDefined ||
(df.sparkSession.sessionState.conf.getConf(
+ SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined &&
+
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
+ .isInstanceOf[DelegatingCatalogExtension])
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
index 5e7feec149c9..da8aad16f55d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
@@ -34,7 +34,6 @@ import
org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAg
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.internal.{SqlApiConf, SQLConf}
-import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types.{ArrayType, MapType, StringType,
StructField, StructType}
class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
@@ -158,7 +157,6 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
}
test("disable bucketing on collated string column") {
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
def createTable(bucketColumns: String*): Unit = {
val tableName = "test_partition_tbl"
withTable(tableName) {
@@ -760,7 +758,6 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
}
test("disable partition on collated string column") {
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
def createTable(partitionColumns: String*): Unit = {
val tableName = "test_partition_tbl"
withTable(tableName) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index 7bbb6485c273..fe078c5ae441 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -55,8 +55,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
"and a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
- val format = spark.sessionState.conf.defaultDataSourceName
- sql(s"CREATE TABLE same_name(id LONG) USING $format")
+ sql(s"CREATE TABLE same_name(id LONG) USING $v2Format")
spark.range(10).createTempView("same_name")
spark.range(20).write.format(v2Format).mode(SaveMode.Append).saveAsTable("same_name")
checkAnswer(spark.table("same_name"), spark.range(10).toDF())
@@ -88,6 +87,15 @@ class DataSourceV2DataFrameSessionCatalogSuite
assert(tableInfo.properties().get("provider") === v2Format)
}
}
+
+ test("SPARK-49246: saveAsTable with v1 format") {
+ withTable("t") {
+ sql("CREATE TABLE t(c INT) USING csv")
+ val df = spark.range(10).toDF()
+ df.write.mode(SaveMode.Overwrite).format("csv").saveAsTable("t")
+ verifyTable("t", df)
+ }
+ }
}
class InMemoryTableSessionCatalog extends
TestV2SessionCatalogBase[InMemoryTable] {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
index ff944dbb805c..2254abef3fcb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
@@ -22,8 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.CollectionConverters._
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
DelegatingCatalogExtension, Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
@@ -53,14 +52,10 @@ private[connector] trait TestV2SessionCatalogBase[T <:
Table] extends Delegating
if (tables.containsKey(ident)) {
tables.get(ident)
} else {
- // Table was created through the built-in catalog
- super.loadTable(ident) match {
- case v1Table: V1Table if v1Table.v1Table.tableType ==
CatalogTableType.VIEW => v1Table
- case t =>
- val table = newTable(t.name(), t.schema(), t.partitioning(),
t.properties())
- addTable(ident, table)
- table
- }
+ // Table was created through the built-in catalog via v1 command, this
is OK as the
+ // `loadTable` should always be invoked, and we set the `tableCreated`
to pass validation.
+ tableCreated.set(true)
+ super.loadTable(ident)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]