This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 5054b41cffc5 [SPARK-51936][SQL] ReplaceTableAsSelect should overwrite the new table instead of append 5054b41cffc5 is described below commit 5054b41cffc55054918fbabe8981aa9a31f9e2cc Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Apr 29 10:53:20 2025 +0800 [SPARK-51936][SQL] ReplaceTableAsSelect should overwrite the new table instead of append For file source v1, if you do ``` Seq(1 -> "a").toDF().write.option("path", p).saveAsTable("t") Seq(2 -> "b").toDF().write.mode("overwrite").option("path", p).saveAsTable("t") ``` At the end, the data of `t` is `[2, "b"]`, because the v1 command `CreateDataSourceTableAsSelectCommand` uses `Overwrite` mode to write the data to the file directory. With DS v2, we use the v2 command `ReplaceTableAsSelect`, which uses `AppendData` to write to the new table. If the new table still keeps the old data, which can happen for file source tables, as DROP TABLE won't delete the external location, then the behavior will be different from file source v1. This PR fixes this inconsistency by using `OverwriteByExpression` in `ReplaceTableAsSelect` physical commands. Fixes a potential inconsistency issue between file source v1 and v2, for now we are fine as we don't support file source v2 table yet. This is also helpful for third-party v2 sources that may retain old data in the new table. No, file source v2 table is not supported yet. update an existing test no Closes #50739 from cloud-fan/RTAS. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/v2/WriteToDataSourceV2Exec.scala | 25 ++++++++++++---------- .../spark/sql/connector/DataSourceV2Suite.scala | 2 +- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 230864c0e267..40ac3a1e6ee4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -24,8 +24,8 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, ProjectingInternalRow} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, TableSpec, UnaryNode} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, TableSpec, UnaryNode} import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, ReplaceDataProjections, WriteDeltaProjections} import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION, WRITE_OPERATION, WRITE_WITH_METADATA_OPERATION} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableWritePrivilege} @@ -86,7 +86,7 @@ case class CreateTableAsSelectExec( ident, getV2Columns(query.schema, catalog.useNullableQuerySchema), partitioning.toArray, properties.asJava) ).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, table, writeOptions, ident, query) + writeToTable(catalog, table, writeOptions, ident, query, overwrite = false) } } @@ -124,7 +124,7 @@ case class AtomicCreateTableAsSelectExec( ident, getV2Columns(query.schema, catalog.useNullableQuerySchema), partitioning.toArray, properties.asJava) ).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, stagedTable, writeOptions, ident, query) + writeToTable(catalog, stagedTable, writeOptions, ident, query, overwrite = false) } } @@ -171,7 +171,7 @@ case class ReplaceTableAsSelectExec( ident, getV2Columns(query.schema, catalog.useNullableQuerySchema), partitioning.toArray, properties.asJava) ).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, table, writeOptions, ident, query) + writeToTable(catalog, table, writeOptions, ident, query, overwrite = true) } } @@ -225,7 +225,7 @@ case class AtomicReplaceTableAsSelectExec( } val table = Option(staged).getOrElse( catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, table, writeOptions, ident, query) + writeToTable(catalog, table, writeOptions, ident, query, overwrite = true) } } @@ -680,15 +680,18 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec { table: Table, writeOptions: Map[String, String], ident: Identifier, - query: LogicalPlan): Seq[InternalRow] = { + query: LogicalPlan, + overwrite: Boolean): Seq[InternalRow] = { Utils.tryWithSafeFinallyAndFailureCallbacks({ val relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) - val append = AppendData.byPosition(relation, query, writeOptions) - val qe = session.sessionState.executePlan(append) + val writeCommand = if (overwrite) { + OverwriteByExpression.byPosition(relation, query, Literal.TrueLiteral, writeOptions) + } else { + AppendData.byPosition(relation, query, writeOptions) + } + val qe = session.sessionState.executePlan(writeCommand) qe.assertCommandExecuted() - DataSourceV2Utils.commitStagedChanges(sparkContext, table, metrics) - Nil })(catchBlock = { table match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index f5ca885b1ad6..3eeed2e41754 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -819,7 +819,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS |OPTIONS (PATH '$path') |AS VALUES (2, 3) |""".stripMargin) - checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), Row(1, 2), Row(2, 3))) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3))) // Replace the table without the path options. sql( s""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org