This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 5054b41cffc5 [SPARK-51936][SQL] ReplaceTableAsSelect should overwrite 
the new table instead of append
5054b41cffc5 is described below

commit 5054b41cffc55054918fbabe8981aa9a31f9e2cc
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Apr 29 10:53:20 2025 +0800

    [SPARK-51936][SQL] ReplaceTableAsSelect should overwrite the new table 
instead of append
    
    For file source v1, if you do
    ```
    Seq(1 -> "a").toDF().write.option("path", p).saveAsTable("t")
    Seq(2 -> "b").toDF().write.mode("overwrite").option("path", 
p).saveAsTable("t")
    ```
    At the end, the data of `t` is `[2, "b"]`, because the v1 command 
`CreateDataSourceTableAsSelectCommand` uses `Overwrite` mode to write the data 
to the file directory.
    
    With DS v2, we use the v2 command `ReplaceTableAsSelect`, which uses 
`AppendData` to write to the new table. If the new table still keeps the old 
data, which can happen for file source tables, as DROP TABLE won't delete the 
external location, then the behavior will be different from file source v1.
    
    This PR fixes this inconsistency by using `OverwriteByExpression` in 
`ReplaceTableAsSelect` physical commands.
    
    Fixes a potential inconsistency issue between file source v1 and v2, for 
now we are fine as we don't support file source v2 table yet.
    This is also helpful for third-party v2 sources that may retain old data in 
the new table.
    
    No, file source v2 table is not supported yet.
    
    update an existing test
    
    no
    
    Closes #50739 from cloud-fan/RTAS.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 25 ++++++++++++----------
 .../spark/sql/connector/DataSourceV2Suite.scala    |  2 +-
 2 files changed, 15 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 230864c0e267..40ac3a1e6ee4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -24,8 +24,8 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, ProjectingInternalRow}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
TableSpec, UnaryNode}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
OverwriteByExpression, TableSpec, UnaryNode}
 import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, 
CharVarcharUtils, ReplaceDataProjections, WriteDeltaProjections}
 import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, 
INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION, WRITE_OPERATION, 
WRITE_WITH_METADATA_OPERATION}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, 
TableWritePrivilege}
@@ -86,7 +86,7 @@ case class CreateTableAsSelectExec(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
       partitioning.toArray, properties.asJava)
     ).getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, table, writeOptions, ident, query)
+    writeToTable(catalog, table, writeOptions, ident, query, overwrite = false)
   }
 }
 
@@ -124,7 +124,7 @@ case class AtomicCreateTableAsSelectExec(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
       partitioning.toArray, properties.asJava)
     ).getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, stagedTable, writeOptions, ident, query)
+    writeToTable(catalog, stagedTable, writeOptions, ident, query, overwrite = 
false)
   }
 }
 
@@ -171,7 +171,7 @@ case class ReplaceTableAsSelectExec(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
       partitioning.toArray, properties.asJava)
     ).getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, table, writeOptions, ident, query)
+    writeToTable(catalog, table, writeOptions, ident, query, overwrite = true)
   }
 }
 
@@ -225,7 +225,7 @@ case class AtomicReplaceTableAsSelectExec(
     }
     val table = Option(staged).getOrElse(
       catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, table, writeOptions, ident, query)
+    writeToTable(catalog, table, writeOptions, ident, query, overwrite = true)
   }
 }
 
@@ -680,15 +680,18 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends 
LeafV2CommandExec {
       table: Table,
       writeOptions: Map[String, String],
       ident: Identifier,
-      query: LogicalPlan): Seq[InternalRow] = {
+      query: LogicalPlan,
+      overwrite: Boolean): Seq[InternalRow] = {
     Utils.tryWithSafeFinallyAndFailureCallbacks({
       val relation = DataSourceV2Relation.create(table, Some(catalog), 
Some(ident))
-      val append = AppendData.byPosition(relation, query, writeOptions)
-      val qe = session.sessionState.executePlan(append)
+      val writeCommand = if (overwrite) {
+        OverwriteByExpression.byPosition(relation, query, Literal.TrueLiteral, 
writeOptions)
+      } else {
+        AppendData.byPosition(relation, query, writeOptions)
+      }
+      val qe = session.sessionState.executePlan(writeCommand)
       qe.assertCommandExecuted()
-
       DataSourceV2Utils.commitStagedChanges(sparkContext, table, metrics)
-
       Nil
     })(catchBlock = {
       table match {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index f5ca885b1ad6..3eeed2e41754 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -819,7 +819,7 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
              |OPTIONS (PATH '$path')
              |AS VALUES (2, 3)
              |""".stripMargin)
-        checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), 
Row(1, 2), Row(2, 3)))
+        checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3)))
         // Replace the table without the path options.
         sql(
           s"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to