This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 825682d3c95e [SPARK-51191][SQL] Validate default values handling in 
DELETE, UPDATE, MERGE
825682d3c95e is described below

commit 825682d3c95e078ba744a29880d5939f3634c915
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Mar 28 15:31:11 2025 -0700

    [SPARK-51191][SQL] Validate default values handling in DELETE, UPDATE, MERGE
    
    ### What changes were proposed in this pull request?
    
    This PR adds tests for default values handling in DELETE, UPDATE, MERGE.
    
    ### Why are the changes needed?
    
    These tests are needed to ensure default values are handled correctly in 
all DML operations.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This PR consists of tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50271 from aokolnychyi/spark-51191.
    
    Authored-by: Anton Okolnychyi <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../InMemoryRowLevelOperationTableCatalog.scala    | 23 +++++++
 .../sql/connector/DeleteFromTableSuiteBase.scala   | 78 ++++++++++++++++++++++
 .../sql/connector/MergeIntoTableSuiteBase.scala    | 52 +++++++++++++++
 .../spark/sql/connector/UpdateTableSuiteBase.scala | 37 ++++++++++
 4 files changed, 190 insertions(+)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
index deb200650bd5..f5a69cd96d96 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
@@ -43,4 +43,27 @@ class InMemoryRowLevelOperationTableCatalog extends 
InMemoryTableCatalog {
     namespaces.putIfAbsent(ident.namespace.toList, Map())
     table
   }
+
+  override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+    val table = loadTable(ident).asInstanceOf[InMemoryRowLevelOperationTable]
+    val properties = CatalogV2Util.applyPropertiesChanges(table.properties, 
changes)
+    val schema = CatalogV2Util.applySchemaChanges(
+      table.schema,
+      changes,
+      tableProvider = Some("in-memory"),
+      statementType = "ALTER TABLE")
+    val partitioning = CatalogV2Util.applyClusterByChanges(table.partitioning, 
schema, changes)
+
+    // fail if the last column in the schema was dropped
+    if (schema.fields.isEmpty) {
+      throw new IllegalArgumentException(s"Cannot drop all fields")
+    }
+
+    val newTable = new InMemoryRowLevelOperationTable(table.name, schema, 
partitioning, properties)
+    newTable.withData(table.data)
+
+    tables.put(ident, newTable)
+
+    newTable
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
index 94109681b8ec..585480ace725 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
@@ -24,6 +24,84 @@ abstract class DeleteFromTableSuiteBase extends 
RowLevelOperationSuiteBase {
 
   import testImplicits._
 
+  test("delete from table containing added column with default value") {
+    createAndInitTable("pk INT NOT NULL, dep STRING", """{ "pk": 1, "dep": 
"hr" }""")
+
+    sql(s"ALTER TABLE $tableNameAsString ADD COLUMN txt STRING DEFAULT 
'initial-text'")
+
+    append("pk INT, dep STRING",
+      """{ "pk": 2, "dep": "hr" }
+        |{ "pk": 3, "dep": "software" }
+        |{ "pk": 4, "dep": "hr" }
+        |""".stripMargin)
+
+    sql(s"ALTER TABLE $tableNameAsString ALTER COLUMN txt SET DEFAULT 
'new-text'")
+
+    append("pk INT, dep STRING",
+      """{ "pk": 5, "dep": "hr" }
+        |{ "pk": 6, "dep": "hr" }
+        |""".stripMargin)
+
+    checkAnswer(
+      sql(s"SELECT * FROM $tableNameAsString"),
+      Seq(
+        Row(1, "hr", "initial-text"),
+        Row(2, "hr", "initial-text"),
+        Row(3, "software", "initial-text"),
+        Row(4, "hr", "initial-text"),
+        Row(5, "hr", "new-text"),
+        Row(6, "hr", "new-text")))
+
+    sql(s"DELETE FROM $tableNameAsString WHERE pk IN (2, 5)")
+
+    checkAnswer(
+      sql(s"SELECT * FROM $tableNameAsString"),
+      Seq(
+        Row(1, "hr", "initial-text"),
+        Row(3, "software", "initial-text"),
+        Row(4, "hr", "initial-text"),
+        Row(6, "hr", "new-text")))
+  }
+
+  test("delete from table containing struct column with default value") {
+    sql(
+      s"""CREATE TABLE $tableNameAsString (
+         | pk INT NOT NULL,
+         | complex STRUCT<c1:INT,c2:STRING> DEFAULT struct(-1, 'unknown'),
+         | dep STRING)
+         |PARTITIONED BY (dep)
+         |""".stripMargin)
+
+    append("pk INT NOT NULL, dep STRING",
+      """{ "pk": 1, "dep": "hr" }
+        |{ "pk": 2, "dep": "hr" }
+        |{ "pk": 3, "dep": "hr" }
+        |""".stripMargin)
+
+    append("pk INT NOT NULL, complex STRUCT<c1:INT,c2:STRING>, dep STRING",
+      """{ "pk": 4, "dep": "hr" }
+        |{ "pk": 5, "complex": { "c1": 5, "c2": "v5" }, "dep": "hr" }
+        |""".stripMargin)
+
+    checkAnswer(
+      sql(s"SELECT * FROM $tableNameAsString"),
+      Seq(
+        Row(1, Row(-1, "unknown"), "hr"),
+        Row(2, Row(-1, "unknown"), "hr"),
+        Row(3, Row(-1, "unknown"), "hr"),
+        Row(4, null, "hr"),
+        Row(5, Row(5, "v5"), "hr")))
+
+    sql(s"DELETE FROM $tableNameAsString WHERE pk < 3")
+
+    checkAnswer(
+      sql(s"SELECT * FROM $tableNameAsString"),
+      Seq(
+        Row(3, Row(-1, "unknown"), "hr"),
+        Row(4, null, "hr"),
+        Row(5, Row(5, "v5"), "hr")))
+  }
+
   test("EXPLAIN only delete") {
     createAndInitTable("id INT, dep STRING", """{ "id": 1, "dep": "hr" }""")
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 053616c88d63..f74ad0a00247 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -32,6 +32,58 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase {
 
   import testImplicits._
 
+  test("merge into table containing added column with default value") {
+    withTempView("source") {
+      sql(
+        s"""CREATE TABLE $tableNameAsString (
+           | pk INT NOT NULL,
+           | salary INT NOT NULL DEFAULT -1,
+           | dep STRING)
+           |PARTITIONED BY (dep)
+           |""".stripMargin)
+
+      append("pk INT NOT NULL, dep STRING",
+        """{ "pk": 1, "dep": "hr" }
+          |{ "pk": 2, "dep": "hr" }
+          |{ "pk": 3, "dep": "hr" }
+          |""".stripMargin)
+
+      sql(s"ALTER TABLE $tableNameAsString ADD COLUMN txt STRING DEFAULT 
'initial-text'")
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, -1, "hr", "initial-text"),
+          Row(2, -1, "hr", "initial-text"),
+          Row(3, -1, "hr", "initial-text")))
+
+      val sourceRows = Seq(
+        (1, 100, "hr"),
+        (4, 400, "hr"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      sql(
+        s"""MERGE INTO $tableNameAsString t
+           |USING source s
+           |ON t.pk = s.pk
+           |WHEN MATCHED THEN
+           | UPDATE SET t.salary = s.salary, t.txt = DEFAULT
+           |WHEN NOT MATCHED THEN
+           | INSERT (pk, salary, dep) VALUES (s.pk, DEFAULT, s.dep)
+           |WHEN NOT MATCHED BY SOURCE THEN
+           | UPDATE SET salary = DEFAULT
+           |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 100, "hr", "initial-text"),
+          Row(2, -1, "hr", "initial-text"),
+          Row(3, -1, "hr", "initial-text"),
+          Row(4, -1, "hr", "initial-text")))
+    }
+  }
+
   test("SPARK-45974: merge into non filter attributes table") {
     val tableName: String = "cat.ns1.non_partitioned_table"
     withTable(tableName) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
index f659ca6329e2..d33ad2494c3c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
@@ -27,6 +27,43 @@ abstract class UpdateTableSuiteBase extends 
RowLevelOperationSuiteBase {
 
   import testImplicits._
 
+  test("update table containing added column with default value") {
+    createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+      """{ "pk": 1, "salary": 100, "dep": "hr" }
+        |{ "pk": 2, "salary": 200, "dep": "software" }
+        |""".stripMargin)
+
+    sql(s"ALTER TABLE $tableNameAsString ADD COLUMN txt STRING DEFAULT 
'initial-text'")
+
+    append("pk INT, salary INT, dep STRING, txt STRING",
+      """{ "pk": 3, "salary": 300, "dep": "hr", "txt": "explicit-text" }
+        |{ "pk": 4, "salary": 400, "dep": "software", "txt": "explicit-text" }
+        |{ "pk": 5, "salary": 500, "dep": "hr" }
+        |""".stripMargin)
+
+    checkAnswer(
+      sql(s"SELECT * FROM $tableNameAsString"),
+      Seq(
+        Row(1, 100, "hr", "initial-text"),
+        Row(2, 200, "software", "initial-text"),
+        Row(3, 300, "hr", "explicit-text"),
+        Row(4, 400, "software", "explicit-text"),
+        Row(5, 500, "hr", null)))
+
+    sql(s"ALTER TABLE $tableNameAsString ALTER COLUMN txt SET DEFAULT 
'new-text'")
+
+    sql(s"UPDATE $tableNameAsString SET txt = DEFAULT WHERE pk IN (2, 8, 11)")
+
+    checkAnswer(
+      sql(s"SELECT * FROM $tableNameAsString"),
+      Seq(
+        Row(1, 100, "hr", "initial-text"),
+        Row(2, 200, "software", "new-text"),
+        Row(3, 300, "hr", "explicit-text"),
+        Row(4, 400, "software", "explicit-text"),
+        Row(5, 500, "hr", null)))
+  }
+
   test("EXPLAIN only update") {
     createAndInitTable("pk INT NOT NULL, dep STRING", """{ "pk": 1, "dep": 
"hr" }""")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to