This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 176e92a077b1 [SPARK-51936][SQL] ReplaceTableAsSelect should overwrite 
the new table instead of append
176e92a077b1 is described below

commit 176e92a077b1d6cbb77088b5036c734931d1a230
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Apr 29 10:53:20 2025 +0800

    [SPARK-51936][SQL] ReplaceTableAsSelect should overwrite the new table 
instead of append
    
    For file source v1, if you do
    ```
    Seq(1 -> "a").toDF().write.option("path", p).saveAsTable("t")
    Seq(2 -> "b").toDF().write.mode("overwrite").option("path", 
p).saveAsTable("t")
    ```
    At the end, the data of `t` is `[2, "b"]`, because the v1 command 
`CreateDataSourceTableAsSelectCommand` uses `Overwrite` mode to write the data 
to the file directory.
    
    With DS v2, we use the v2 command `ReplaceTableAsSelect`, which uses 
`AppendData` to write to the new table. If the new table still keeps the old 
data, which can happen for file source tables, as DROP TABLE won't delete the 
external location, then the behavior will be different from file source v1.
    
    This PR fixes this inconsistency by using `OverwriteByExpression` in 
`ReplaceTableAsSelect` physical commands.
    
    Fixes a potential inconsistency issue between file source v1 and v2, for 
now we are fine as we don't support file source v2 table yet.
    This is also helpful for third-party v2 sources that may retain old data in 
the new table.
    
    No, file source v2 table is not supported yet.
    
    update an existing test
    
    no
    
    Closes #50739 from cloud-fan/RTAS.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 25 ++++++++++++----------
 1 file changed, 14 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index c99e2bba2e96..12ab7c57ea51 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -24,8 +24,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
TableSpec, UnaryNode}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
OverwriteByExpression, TableSpec, UnaryNode}
 import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, 
CharVarcharUtils, WriteDeltaProjections}
 import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, 
INSERT_OPERATION, UPDATE_OPERATION}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, 
TableWritePrivilege}
@@ -85,7 +85,7 @@ case class CreateTableAsSelectExec(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
       partitioning.toArray, properties.asJava)
     ).getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, table, writeOptions, ident, query)
+    writeToTable(catalog, table, writeOptions, ident, query, overwrite = false)
   }
 }
 
@@ -120,7 +120,7 @@ case class AtomicCreateTableAsSelectExec(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
       partitioning.toArray, properties.asJava)
     ).getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, stagedTable, writeOptions, ident, query)
+    writeToTable(catalog, stagedTable, writeOptions, ident, query, overwrite = 
false)
   }
 }
 
@@ -167,7 +167,7 @@ case class ReplaceTableAsSelectExec(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
       partitioning.toArray, properties.asJava)
     ).getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, table, writeOptions, ident, query)
+    writeToTable(catalog, table, writeOptions, ident, query, overwrite = true)
   }
 }
 
@@ -218,7 +218,7 @@ case class AtomicReplaceTableAsSelectExec(
     }
     val table = Option(staged).getOrElse(
       catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
-    writeToTable(catalog, table, writeOptions, ident, query)
+    writeToTable(catalog, table, writeOptions, ident, query, overwrite = true)
   }
 }
 
@@ -574,18 +574,21 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends 
LeafV2CommandExec {
       table: Table,
       writeOptions: Map[String, String],
       ident: Identifier,
-      query: LogicalPlan): Seq[InternalRow] = {
+      query: LogicalPlan,
+      overwrite: Boolean): Seq[InternalRow] = {
     Utils.tryWithSafeFinallyAndFailureCallbacks({
       val relation = DataSourceV2Relation.create(table, Some(catalog), 
Some(ident))
-      val append = AppendData.byPosition(relation, query, writeOptions)
-      val qe = session.sessionState.executePlan(append)
+      val writeCommand = if (overwrite) {
+        OverwriteByExpression.byPosition(relation, query, Literal.TrueLiteral, 
writeOptions)
+      } else {
+        AppendData.byPosition(relation, query, writeOptions)
+      }
+      val qe = session.sessionState.executePlan(writeCommand)
       qe.assertCommandExecuted()
-
       table match {
         case st: StagedTable => st.commitStagedChanges()
         case _ =>
       }
-
       Nil
     })(catchBlock = {
       table match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to