This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 37104b6b6922 [SPARK-54462][SQL] Add
`SupportsV1OverwriteWithSaveAsTable` mixin for `TableProvider`
37104b6b6922 is described below
commit 37104b6b6922829c61b4afa9be02d4cd1deb7f37
Author: Juliusz Sompolski <Juliusz Sompolski>
AuthorDate: Fri Nov 28 20:15:51 2025 +0800
[SPARK-54462][SQL] Add `SupportsV1OverwriteWithSaveAsTable` mixin for
`TableProvider`
### What changes were proposed in this pull request?
This PR adds a new marker interface SupportsV1OverwriteWithSaveAsTable that
data sources can implement to distinguish between DataFrameWriter V1
saveAsTable with SaveMode.Overwrite and DataFrameWriter V2
createOrReplace/replace operations.
* Added SupportsV1OverwriteWithSaveAsTable mixin interface to TableProvider
* Modified DataFrameWriter.saveAsTable to add an internal write option
(`__v1_save_as_table_overwrite=true`) when the provider implements this
interface and mode is Overwrite
* Added tests verifying the option is only added for providers that opt-in
### Why are the changes needed?
Spark's SaveMode.Overwrite is documented as:
```
* if data/table already exists, existing data is expected to be
overwritten
* by the contents of the DataFrame.
```
It does not define the behaviour of overwriting the table metadata (schema,
etc).
However, DataFrameWriter V1 creates a ReplaceTableAsSelect plan, which is
the same as the plan of DataFrameWriterV2 createOrReplace API, which is
documented as:
```
* The output table's schema, partition layout, properties, and other
configuration
* will be based on the contents of the data frame and the
configuration set on this
* writer. If the table exists, its configuration and data will be
replaced.
```
Therefore, for calls via DataFrameWriter V2 createOrReplace, the metadata
always needs to be replaced.
Datasources migrating from V1 to V2 might have interpreted it differently.
In particular, Delta Lake datasource interpretation of this API
documentation of DataFrameWriter V1 is to not replace table schema, unless
Delta-specific option "overwriteSchema" is set to true. Changing the bahaviour
to the V2 semantics is unfriendly to the users, as it can cause corruption of
the tables: an operations that overwrote only data before, will now also
overwrite the table's schema, partitioning info and other properties.
Since the created plan is exactly the same, Delta used a very ugly hack to
detect where the API call is coming from based on the stack trace of the call.
In Spark 4.1 in connect mode, this stopped working because planning and
execution of the commands go decoupled, and the stack trace no longer contains
this point where the plan got created.
To not introduce a behaviour change in the Delta datasource with Spark 4.1
in connect mode, Spark provides this new interface
SupportsV1OverwriteWithSaveAsTable which will make DataFrameWriter V1 add an
explicit storage option to indicate to Delta datasource that this call is
coming from DataFrameWriter V1.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Spark tests added.
It was tested locally with Delta Lake side of changes. It cannot yet be
raised in a Delta Lake PR, because Delta Lake master branch does not yet
cross-compile with Spark 4.1 (WIP).
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code opus-4.5
Closes #53215 from
juliuszsompolski/RequiresDataFrameWriterV1SaveAsTableOverwriteWriteOption.
Authored-by: Juliusz Sompolski <Juliusz Sompolski>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../SupportsV1OverwriteWithSaveAsTable.java | 71 +++++++++++++++++
.../apache/spark/sql/classic/DataFrameWriter.scala | 21 ++++--
.../sql/connector/DataSourceV2DataFrameSuite.scala | 88 +++++++++++++++++++++-
.../spark/sql/connector/FakeV2Provider.scala | 12 ++-
4 files changed, 185 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsV1OverwriteWithSaveAsTable.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsV1OverwriteWithSaveAsTable.java
new file mode 100644
index 000000000000..63ee7493cb85
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsV1OverwriteWithSaveAsTable.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A marker interface that can be mixed into a {@link TableProvider} to
indicate that the data
+ * source needs to distinguish between DataFrameWriter V1 {@code saveAsTable}
operations and
+ * DataFrameWriter V2 {@code createOrReplace}/{@code replace} operations.
+ * <p>
+ * Background: DataFrameWriter V1's {@code saveAsTable} with {@code
SaveMode.Overwrite} creates
+ * a {@code ReplaceTableAsSelect} logical plan, which is identical to the plan
created by
+ * DataFrameWriter V2's {@code createOrReplace}. However, the documented
semantics can have
+ * different interpretations:
+ * <ul>
+ * <li>V1 saveAsTable with Overwrite: "if data/table already exists,
existing data is expected
+ * to be overwritten by the contents of the DataFrame" - does not define
behavior for
+ * metadata (schema) overwriting</li>
+ * <li>V2 createOrReplace: "The output table's schema, partition layout,
properties, and other
+ * configuration will be based on the contents of the data frame... If
the table exists,
+ * its configuration and data will be replaced"</li>
+ * </ul>
+ * <p>
+ * Data sources that migrated from V1 to V2 may have adopted different
behaviors based on these
+ * documented semantics. For example, Delta Lake interprets V1 saveAsTable to
not replace table
+ * schema unless the {@code overwriteSchema} option is explicitly set.
+ * <p>
+ * When a {@link TableProvider} implements this interface and
+ * {@link #addV1OverwriteWithSaveAsTableOption()} returns true,
DataFrameWriter V1 will add an
+ * internal write option to indicate that the command originated from
saveAsTable API.
+ * The option key used is defined by {@link #OPTION_NAME} and the value will
be set to "true".
+ * This allows the data source to distinguish between the two APIs and apply
appropriate
+ * semantics.
+ *
+ * @since 4.1.0
+ */
+@Evolving
+public interface SupportsV1OverwriteWithSaveAsTable extends TableProvider {
+ /**
+ * The name of the internal write option that indicates the command
originated from
+ * DataFrameWriter V1 saveAsTable API.
+ */
+ String OPTION_NAME = "__v1_save_as_table_overwrite";
+
+ /**
+ * Returns whether to add the "__v1_save_as_table_overwrite" to write
operations originating
+ * from DataFrameWriter V1 saveAsTable with mode Overwrite.
+ * Implementations can override this method to control when the option is
added.
+ *
+ * @return true if the option should be added (default), false otherwise
+ */
+ default boolean addV1OverwriteWithSaveAsTableOption() {
+ return true;
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
index 2d3e4b84d9ae..52012a862942 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
@@ -438,17 +438,18 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) extends sql.DataFram
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val session = df.sparkSession
- val canUseV2 = lookupV2Provider().isDefined || (hasCustomSessionCatalog &&
+ val v2ProviderOpt = lookupV2Provider()
+ val canUseV2 = v2ProviderOpt.isDefined || (hasCustomSessionCatalog &&
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
.isInstanceOf[CatalogExtension])
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
- saveAsTableCommand(catalog.asTableCatalog, ident, nameParts)
+ saveAsTableCommand(catalog.asTableCatalog, v2ProviderOpt, ident,
nameParts)
case nameParts @ SessionCatalogAndIdentifier(catalog, ident)
if canUseV2 && ident.namespace().length <= 1 =>
- saveAsTableCommand(catalog.asTableCatalog, ident, nameParts)
+ saveAsTableCommand(catalog.asTableCatalog, v2ProviderOpt, ident,
nameParts)
case AsTableIdentifier(tableIdentifier) =>
saveAsV1TableCommand(tableIdentifier)
@@ -459,7 +460,10 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) extends sql.DataFram
}
private def saveAsTableCommand(
- catalog: TableCatalog, ident: Identifier, nameParts: Seq[String]):
LogicalPlan = {
+ catalog: TableCatalog,
+ v2ProviderOpt: Option[TableProvider],
+ ident: Identifier,
+ nameParts: Seq[String]): LogicalPlan = {
val tableOpt = try Option(catalog.loadTable(ident,
getWritePrivileges.toSet.asJava)) catch {
case _: NoSuchTableException => None
}
@@ -484,12 +488,19 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) extends sql.DataFram
serde = None,
external = false,
constraints = Seq.empty)
+ val writeOptions = v2ProviderOpt match {
+ case Some(p: SupportsV1OverwriteWithSaveAsTable)
+ if p.addV1OverwriteWithSaveAsTableOption() =>
+ extraOptions + (SupportsV1OverwriteWithSaveAsTable.OPTION_NAME ->
"true")
+ case _ =>
+ extraOptions
+ }
ReplaceTableAsSelect(
UnresolvedIdentifier(nameParts),
partitioningAsV2,
df.queryExecution.analyzed,
tableSpec,
- writeOptions = extraOptions.toMap,
+ writeOptions = writeOptions.toMap,
orCreate = true) // Create the table if it doesn't exist
case (other, _) =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index d8d68f576e4e..5f4aee277831 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame,
Row, SaveMode}
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData,
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue,
DefaultValue, Identifier, InMemoryTableCatalog, TableInfo}
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue,
DefaultValue, Identifier, InMemoryTableCatalog,
SupportsV1OverwriteWithSaveAsTable, TableInfo}
import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn,
UpdateColumnDefaultValue}
import org.apache.spark.sql.connector.catalog.TableWritePrivilege
@@ -282,6 +282,92 @@ class DataSourceV2DataFrameSuite
}
}
+ test("RTAS adds V1 saveAsTable option when provider implements marker
interface") {
+ var plan: LogicalPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ plan = qe.analyzed
+ }
+ override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {}
+ }
+ try {
+ spark.listenerManager.register(listener)
+ val t1 = "testcat.ns1.ns2.tbl"
+ val providerName =
classOf[FakeV2ProviderWithV1SaveAsTableOverwriteWriteOption].getName
+
+ val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+ df.write.format(providerName).mode("overwrite").saveAsTable(t1)
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ plan match {
+ case o: ReplaceTableAsSelect =>
+
assert(o.writeOptions.get(SupportsV1OverwriteWithSaveAsTable.OPTION_NAME)
+ .contains("true"))
+ case other =>
+ fail(s"Expected ReplaceTableAsSelect, got ${other.getClass.getName}:
$plan")
+ }
+ } finally {
+ spark.listenerManager.unregister(listener)
+ }
+ }
+
+ test("RTAS does not add V1 option when provider does not implement marker
interface") {
+ var plan: LogicalPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ plan = qe.analyzed
+ }
+ override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {}
+ }
+ try {
+ spark.listenerManager.register(listener)
+ val t1 = "testcat.ns1.ns2.tbl2"
+ val providerName = classOf[FakeV2Provider].getName
+
+ val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+ df.write.format(providerName).mode("overwrite").saveAsTable(t1)
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ plan match {
+ case o: ReplaceTableAsSelect =>
+
assert(!o.writeOptions.contains(SupportsV1OverwriteWithSaveAsTable.OPTION_NAME))
+ case other =>
+ fail(s"Expected ReplaceTableAsSelect, got ${other.getClass.getName}:
$plan")
+ }
+ } finally {
+ spark.listenerManager.unregister(listener)
+ }
+ }
+
+ test("RTAS does not add V1 option when addV1OverwriteWithSaveAsTableOption
returns false") {
+ var plan: LogicalPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ plan = qe.analyzed
+ }
+ override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {}
+ }
+ try {
+ spark.listenerManager.register(listener)
+ val t1 = "testcat.ns1.ns2.tbl3"
+ val providerName =
+
classOf[FakeV2ProviderWithV1SaveAsTableOverwriteWriteOptionDisabled].getName
+
+ val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+ df.write.format(providerName).mode("overwrite").saveAsTable(t1)
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ plan match {
+ case o: ReplaceTableAsSelect =>
+
assert(!o.writeOptions.contains(SupportsV1OverwriteWithSaveAsTable.OPTION_NAME))
+ case other =>
+ fail(s"Expected ReplaceTableAsSelect, got ${other.getClass.getName}:
$plan")
+ }
+ } finally {
+ spark.listenerManager.unregister(listener)
+ }
+ }
+
test("add columns with default values") {
val tableName = "testcat.ns1.ns2.tbl"
withTable(tableName) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
index 25d2d5a67d44..d9faa7d7473d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector
import java.util
-import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
+import
org.apache.spark.sql.connector.catalog.{SupportsV1OverwriteWithSaveAsTable,
Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder}
import org.apache.spark.sql.types.StructType
@@ -83,3 +83,13 @@ class FakeV2ProviderWithCustomSchema extends FakeV2Provider {
new FakeTable(schema, partitioning, new
CaseInsensitiveStringMap(properties))
}
}
+
+class FakeV2ProviderWithV1SaveAsTableOverwriteWriteOption
+ extends FakeV2Provider
+ with SupportsV1OverwriteWithSaveAsTable
+
+class FakeV2ProviderWithV1SaveAsTableOverwriteWriteOptionDisabled
+ extends FakeV2Provider
+ with SupportsV1OverwriteWithSaveAsTable {
+ override def addV1OverwriteWithSaveAsTableOption(): Boolean = false
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]