This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 37104b6b6922 [SPARK-54462][SQL] Add 
`SupportsV1OverwriteWithSaveAsTable` mixin for `TableProvider`
37104b6b6922 is described below

commit 37104b6b6922829c61b4afa9be02d4cd1deb7f37
Author: Juliusz Sompolski <Juliusz Sompolski>
AuthorDate: Fri Nov 28 20:15:51 2025 +0800

    [SPARK-54462][SQL] Add `SupportsV1OverwriteWithSaveAsTable` mixin for 
`TableProvider`
    
    ### What changes were proposed in this pull request?
    
    This PR adds a new marker interface SupportsV1OverwriteWithSaveAsTable that 
data sources can implement to distinguish between DataFrameWriter V1 
saveAsTable with SaveMode.Overwrite and DataFrameWriter V2 
createOrReplace/replace operations.
    
    * Added SupportsV1OverwriteWithSaveAsTable mixin interface to TableProvider
    * Modified DataFrameWriter.saveAsTable to add an internal write option 
(`__v1_save_as_table_overwrite=true`) when the provider implements this 
interface and mode is Overwrite
    * Added tests verifying the option is only added for providers that opt-in
    
    ### Why are the changes needed?
    
    Spark's SaveMode.Overwrite is documented as:
    
    ```
            * if data/table already exists, existing data is expected to be 
overwritten
            * by the contents of the DataFrame.
    ```
    It does not define the behaviour of overwriting the table metadata (schema, 
etc).
    
    However, DataFrameWriter V1 creates a ReplaceTableAsSelect plan, which is 
the same as the plan of DataFrameWriterV2 createOrReplace API, which is 
documented as:
    
    ```
           * The output table's schema, partition layout, properties, and other 
configuration
           * will be based on the contents of the data frame and the 
configuration set on this
           * writer. If the table exists, its configuration and data will be 
replaced.
    ```
    
    Therefore, for calls via DataFrameWriter V2 createOrReplace, the metadata 
always needs to be replaced.
    
    Datasources migrating from V1 to V2 might have interpreted it differently.
    
    In particular, Delta Lake datasource interpretation of this API 
documentation of DataFrameWriter V1 is to not replace table schema, unless 
Delta-specific option "overwriteSchema" is set to true. Changing the bahaviour 
to the V2 semantics is unfriendly to the users, as it can cause corruption of 
the tables: an operations that overwrote only data before, will now also 
overwrite the table's schema, partitioning info and other properties.
    
    Since the created plan is exactly the same, Delta used a very ugly hack to 
detect where the API call is coming from based on the stack trace of the call.
    In Spark 4.1 in connect mode, this stopped working because planning and 
execution of the commands go decoupled, and the stack trace no longer contains 
this point where the plan got created.
    To not introduce a behaviour change in the Delta datasource with Spark 4.1 
in connect mode, Spark provides this new interface 
SupportsV1OverwriteWithSaveAsTable which will make DataFrameWriter V1 add an 
explicit storage option to indicate to Delta datasource that this call is 
coming from DataFrameWriter V1.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Spark tests added.
    It was tested locally with Delta Lake side of changes. It cannot yet be 
raised in a Delta Lake PR, because Delta Lake master branch does not yet 
cross-compile with Spark 4.1 (WIP).
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: Claude Code opus-4.5
    
    Closes #53215 from 
juliuszsompolski/RequiresDataFrameWriterV1SaveAsTableOverwriteWriteOption.
    
    Authored-by: Juliusz Sompolski <Juliusz Sompolski>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../SupportsV1OverwriteWithSaveAsTable.java        | 71 +++++++++++++++++
 .../apache/spark/sql/classic/DataFrameWriter.scala | 21 ++++--
 .../sql/connector/DataSourceV2DataFrameSuite.scala | 88 +++++++++++++++++++++-
 .../spark/sql/connector/FakeV2Provider.scala       | 12 ++-
 4 files changed, 185 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsV1OverwriteWithSaveAsTable.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsV1OverwriteWithSaveAsTable.java
new file mode 100644
index 000000000000..63ee7493cb85
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsV1OverwriteWithSaveAsTable.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A marker interface that can be mixed into a {@link TableProvider} to 
indicate that the data
+ * source needs to distinguish between DataFrameWriter V1 {@code saveAsTable} 
operations and
+ * DataFrameWriter V2 {@code createOrReplace}/{@code replace} operations.
+ * <p>
+ * Background: DataFrameWriter V1's {@code saveAsTable} with {@code 
SaveMode.Overwrite} creates
+ * a {@code ReplaceTableAsSelect} logical plan, which is identical to the plan 
created by
+ * DataFrameWriter V2's {@code createOrReplace}. However, the documented 
semantics can have
+ * different interpretations:
+ * <ul>
+ *   <li>V1 saveAsTable with Overwrite: "if data/table already exists, 
existing data is expected
+ *       to be overwritten by the contents of the DataFrame" - does not define 
behavior for
+ *       metadata (schema) overwriting</li>
+ *   <li>V2 createOrReplace: "The output table's schema, partition layout, 
properties, and other
+ *       configuration will be based on the contents of the data frame... If 
the table exists,
+ *       its configuration and data will be replaced"</li>
+ * </ul>
+ * <p>
+ * Data sources that migrated from V1 to V2 may have adopted different 
behaviors based on these
+ * documented semantics. For example, Delta Lake interprets V1 saveAsTable to 
not replace table
+ * schema unless the {@code overwriteSchema} option is explicitly set.
+ * <p>
+ * When a {@link TableProvider} implements this interface and
+ * {@link #addV1OverwriteWithSaveAsTableOption()} returns true, 
DataFrameWriter V1 will add an
+ * internal write option to indicate that the command originated from 
saveAsTable API.
+ * The option key used is defined by {@link #OPTION_NAME} and the value will 
be set to "true".
+ * This allows the data source to distinguish between the two APIs and apply 
appropriate
+ * semantics.
+ *
+ * @since 4.1.0
+ */
+@Evolving
+public interface SupportsV1OverwriteWithSaveAsTable extends TableProvider {
+  /**
+   * The name of the internal write option that indicates the command 
originated from
+   * DataFrameWriter V1 saveAsTable API.
+   */
+  String OPTION_NAME = "__v1_save_as_table_overwrite";
+
+  /**
+   * Returns whether to add the "__v1_save_as_table_overwrite" to write 
operations originating
+   * from DataFrameWriter V1 saveAsTable with mode Overwrite.
+   * Implementations can override this method to control when the option is 
added.
+   *
+   * @return true if the option should be added (default), false otherwise
+   */
+  default boolean addV1OverwriteWithSaveAsTableOption() {
+    return true;
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
index 2d3e4b84d9ae..52012a862942 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala
@@ -438,17 +438,18 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) extends sql.DataFram
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
     val session = df.sparkSession
-    val canUseV2 = lookupV2Provider().isDefined || (hasCustomSessionCatalog &&
+    val v2ProviderOpt = lookupV2Provider()
+    val canUseV2 = v2ProviderOpt.isDefined || (hasCustomSessionCatalog &&
         
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
           .isInstanceOf[CatalogExtension])
 
     session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
       case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
-        saveAsTableCommand(catalog.asTableCatalog, ident, nameParts)
+        saveAsTableCommand(catalog.asTableCatalog, v2ProviderOpt, ident, 
nameParts)
 
       case nameParts @ SessionCatalogAndIdentifier(catalog, ident)
           if canUseV2 && ident.namespace().length <= 1 =>
-        saveAsTableCommand(catalog.asTableCatalog, ident, nameParts)
+        saveAsTableCommand(catalog.asTableCatalog, v2ProviderOpt, ident, 
nameParts)
 
       case AsTableIdentifier(tableIdentifier) =>
         saveAsV1TableCommand(tableIdentifier)
@@ -459,7 +460,10 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) extends sql.DataFram
   }
 
   private def saveAsTableCommand(
-      catalog: TableCatalog, ident: Identifier, nameParts: Seq[String]): 
LogicalPlan = {
+      catalog: TableCatalog,
+      v2ProviderOpt: Option[TableProvider],
+      ident: Identifier,
+      nameParts: Seq[String]): LogicalPlan = {
     val tableOpt = try Option(catalog.loadTable(ident, 
getWritePrivileges.toSet.asJava)) catch {
       case _: NoSuchTableException => None
     }
@@ -484,12 +488,19 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) extends sql.DataFram
           serde = None,
           external = false,
           constraints = Seq.empty)
+        val writeOptions = v2ProviderOpt match {
+          case Some(p: SupportsV1OverwriteWithSaveAsTable)
+              if p.addV1OverwriteWithSaveAsTableOption() =>
+            extraOptions + (SupportsV1OverwriteWithSaveAsTable.OPTION_NAME -> 
"true")
+          case _ =>
+            extraOptions
+        }
         ReplaceTableAsSelect(
           UnresolvedIdentifier(nameParts),
           partitioningAsV2,
           df.queryExecution.analyzed,
           tableSpec,
-          writeOptions = extraOptions.toMap,
+          writeOptions = writeOptions.toMap,
           orCreate = true) // Create the table if it doesn't exist
 
       case (other, _) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index d8d68f576e4e..5f4aee277831 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, 
Row, SaveMode}
 import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, 
DefaultValue, Identifier, InMemoryTableCatalog, TableInfo}
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, 
DefaultValue, Identifier, InMemoryTableCatalog, 
SupportsV1OverwriteWithSaveAsTable, TableInfo}
 import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
UpdateColumnDefaultValue}
 import org.apache.spark.sql.connector.catalog.TableWritePrivilege
@@ -282,6 +282,92 @@ class DataSourceV2DataFrameSuite
     }
   }
 
+  test("RTAS adds V1 saveAsTable option when provider implements marker 
interface") {
+    var plan: LogicalPlan = null
+    val listener = new QueryExecutionListener {
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+        plan = qe.analyzed
+      }
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
+    }
+    try {
+      spark.listenerManager.register(listener)
+      val t1 = "testcat.ns1.ns2.tbl"
+      val providerName = 
classOf[FakeV2ProviderWithV1SaveAsTableOverwriteWriteOption].getName
+
+      val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+      df.write.format(providerName).mode("overwrite").saveAsTable(t1)
+
+      sparkContext.listenerBus.waitUntilEmpty()
+      plan match {
+        case o: ReplaceTableAsSelect =>
+          
assert(o.writeOptions.get(SupportsV1OverwriteWithSaveAsTable.OPTION_NAME)
+            .contains("true"))
+        case other =>
+          fail(s"Expected ReplaceTableAsSelect, got ${other.getClass.getName}: 
$plan")
+      }
+    } finally {
+      spark.listenerManager.unregister(listener)
+    }
+  }
+
+  test("RTAS does not add V1 option when provider does not implement marker 
interface") {
+    var plan: LogicalPlan = null
+    val listener = new QueryExecutionListener {
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+        plan = qe.analyzed
+      }
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
+    }
+    try {
+      spark.listenerManager.register(listener)
+      val t1 = "testcat.ns1.ns2.tbl2"
+      val providerName = classOf[FakeV2Provider].getName
+
+      val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+      df.write.format(providerName).mode("overwrite").saveAsTable(t1)
+
+      sparkContext.listenerBus.waitUntilEmpty()
+      plan match {
+        case o: ReplaceTableAsSelect =>
+          
assert(!o.writeOptions.contains(SupportsV1OverwriteWithSaveAsTable.OPTION_NAME))
+        case other =>
+          fail(s"Expected ReplaceTableAsSelect, got ${other.getClass.getName}: 
$plan")
+      }
+    } finally {
+      spark.listenerManager.unregister(listener)
+    }
+  }
+
+  test("RTAS does not add V1 option when addV1OverwriteWithSaveAsTableOption 
returns false") {
+    var plan: LogicalPlan = null
+    val listener = new QueryExecutionListener {
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+        plan = qe.analyzed
+      }
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
+    }
+    try {
+      spark.listenerManager.register(listener)
+      val t1 = "testcat.ns1.ns2.tbl3"
+      val providerName =
+        
classOf[FakeV2ProviderWithV1SaveAsTableOverwriteWriteOptionDisabled].getName
+
+      val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+      df.write.format(providerName).mode("overwrite").saveAsTable(t1)
+
+      sparkContext.listenerBus.waitUntilEmpty()
+      plan match {
+        case o: ReplaceTableAsSelect =>
+          
assert(!o.writeOptions.contains(SupportsV1OverwriteWithSaveAsTable.OPTION_NAME))
+        case other =>
+          fail(s"Expected ReplaceTableAsSelect, got ${other.getClass.getName}: 
$plan")
+      }
+    } finally {
+      spark.listenerManager.unregister(listener)
+    }
+  }
+
   test("add columns with default values") {
     val tableName = "testcat.ns1.ns2.tbl"
     withTable(tableName) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
index 25d2d5a67d44..d9faa7d7473d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector
 
 import java.util
 
-import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
+import 
org.apache.spark.sql.connector.catalog.{SupportsV1OverwriteWithSaveAsTable, 
Table, TableProvider}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder}
 import org.apache.spark.sql.types.StructType
@@ -83,3 +83,13 @@ class FakeV2ProviderWithCustomSchema extends FakeV2Provider {
     new FakeTable(schema, partitioning, new 
CaseInsensitiveStringMap(properties))
   }
 }
+
+class FakeV2ProviderWithV1SaveAsTableOverwriteWriteOption
+    extends FakeV2Provider
+    with SupportsV1OverwriteWithSaveAsTable
+
+class FakeV2ProviderWithV1SaveAsTableOverwriteWriteOptionDisabled
+    extends FakeV2Provider
+    with SupportsV1OverwriteWithSaveAsTable {
+  override def addV1OverwriteWithSaveAsTableOption(): Boolean = false
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to