This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 825682d3c95e [SPARK-51191][SQL] Validate default values handling in
DELETE, UPDATE, MERGE
825682d3c95e is described below
commit 825682d3c95e078ba744a29880d5939f3634c915
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Mar 28 15:31:11 2025 -0700
[SPARK-51191][SQL] Validate default values handling in DELETE, UPDATE, MERGE
### What changes were proposed in this pull request?
This PR adds tests for default values handling in DELETE, UPDATE, MERGE.
### Why are the changes needed?
These tests are needed to ensure default values are handled correctly in
all DML operations.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This PR consists of tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #50271 from aokolnychyi/spark-51191.
Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../InMemoryRowLevelOperationTableCatalog.scala | 23 +++++++
.../sql/connector/DeleteFromTableSuiteBase.scala | 78 ++++++++++++++++++++++
.../sql/connector/MergeIntoTableSuiteBase.scala | 52 +++++++++++++++
.../spark/sql/connector/UpdateTableSuiteBase.scala | 37 ++++++++++
4 files changed, 190 insertions(+)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
index deb200650bd5..f5a69cd96d96 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
@@ -43,4 +43,27 @@ class InMemoryRowLevelOperationTableCatalog extends
InMemoryTableCatalog {
namespaces.putIfAbsent(ident.namespace.toList, Map())
table
}
+
+ override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+ val table = loadTable(ident).asInstanceOf[InMemoryRowLevelOperationTable]
+ val properties = CatalogV2Util.applyPropertiesChanges(table.properties,
changes)
+ val schema = CatalogV2Util.applySchemaChanges(
+ table.schema,
+ changes,
+ tableProvider = Some("in-memory"),
+ statementType = "ALTER TABLE")
+ val partitioning = CatalogV2Util.applyClusterByChanges(table.partitioning,
schema, changes)
+
+ // fail if the last column in the schema was dropped
+ if (schema.fields.isEmpty) {
+ throw new IllegalArgumentException(s"Cannot drop all fields")
+ }
+
+ val newTable = new InMemoryRowLevelOperationTable(table.name, schema,
partitioning, properties)
+ newTable.withData(table.data)
+
+ tables.put(ident, newTable)
+
+ newTable
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
index 94109681b8ec..585480ace725 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
@@ -24,6 +24,84 @@ abstract class DeleteFromTableSuiteBase extends
RowLevelOperationSuiteBase {
import testImplicits._
+ test("delete from table containing added column with default value") {
+ createAndInitTable("pk INT NOT NULL, dep STRING", """{ "pk": 1, "dep":
"hr" }""")
+
+ sql(s"ALTER TABLE $tableNameAsString ADD COLUMN txt STRING DEFAULT
'initial-text'")
+
+ append("pk INT, dep STRING",
+ """{ "pk": 2, "dep": "hr" }
+ |{ "pk": 3, "dep": "software" }
+ |{ "pk": 4, "dep": "hr" }
+ |""".stripMargin)
+
+ sql(s"ALTER TABLE $tableNameAsString ALTER COLUMN txt SET DEFAULT
'new-text'")
+
+ append("pk INT, dep STRING",
+ """{ "pk": 5, "dep": "hr" }
+ |{ "pk": 6, "dep": "hr" }
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, "hr", "initial-text"),
+ Row(2, "hr", "initial-text"),
+ Row(3, "software", "initial-text"),
+ Row(4, "hr", "initial-text"),
+ Row(5, "hr", "new-text"),
+ Row(6, "hr", "new-text")))
+
+ sql(s"DELETE FROM $tableNameAsString WHERE pk IN (2, 5)")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, "hr", "initial-text"),
+ Row(3, "software", "initial-text"),
+ Row(4, "hr", "initial-text"),
+ Row(6, "hr", "new-text")))
+ }
+
+ test("delete from table containing struct column with default value") {
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ | pk INT NOT NULL,
+ | complex STRUCT<c1:INT,c2:STRING> DEFAULT struct(-1, 'unknown'),
+ | dep STRING)
+ |PARTITIONED BY (dep)
+ |""".stripMargin)
+
+ append("pk INT NOT NULL, dep STRING",
+ """{ "pk": 1, "dep": "hr" }
+ |{ "pk": 2, "dep": "hr" }
+ |{ "pk": 3, "dep": "hr" }
+ |""".stripMargin)
+
+ append("pk INT NOT NULL, complex STRUCT<c1:INT,c2:STRING>, dep STRING",
+ """{ "pk": 4, "dep": "hr" }
+ |{ "pk": 5, "complex": { "c1": 5, "c2": "v5" }, "dep": "hr" }
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(-1, "unknown"), "hr"),
+ Row(2, Row(-1, "unknown"), "hr"),
+ Row(3, Row(-1, "unknown"), "hr"),
+ Row(4, null, "hr"),
+ Row(5, Row(5, "v5"), "hr")))
+
+ sql(s"DELETE FROM $tableNameAsString WHERE pk < 3")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(3, Row(-1, "unknown"), "hr"),
+ Row(4, null, "hr"),
+ Row(5, Row(5, "v5"), "hr")))
+ }
+
test("EXPLAIN only delete") {
createAndInitTable("id INT, dep STRING", """{ "id": 1, "dep": "hr" }""")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 053616c88d63..f74ad0a00247 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -32,6 +32,58 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase {
import testImplicits._
+ test("merge into table containing added column with default value") {
+ withTempView("source") {
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ | pk INT NOT NULL,
+ | salary INT NOT NULL DEFAULT -1,
+ | dep STRING)
+ |PARTITIONED BY (dep)
+ |""".stripMargin)
+
+ append("pk INT NOT NULL, dep STRING",
+ """{ "pk": 1, "dep": "hr" }
+ |{ "pk": 2, "dep": "hr" }
+ |{ "pk": 3, "dep": "hr" }
+ |""".stripMargin)
+
+ sql(s"ALTER TABLE $tableNameAsString ADD COLUMN txt STRING DEFAULT
'initial-text'")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, -1, "hr", "initial-text"),
+ Row(2, -1, "hr", "initial-text"),
+ Row(3, -1, "hr", "initial-text")))
+
+ val sourceRows = Seq(
+ (1, 100, "hr"),
+ (4, 400, "hr"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ sql(
+ s"""MERGE INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET t.salary = s.salary, t.txt = DEFAULT
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, salary, dep) VALUES (s.pk, DEFAULT, s.dep)
+ |WHEN NOT MATCHED BY SOURCE THEN
+ | UPDATE SET salary = DEFAULT
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr", "initial-text"),
+ Row(2, -1, "hr", "initial-text"),
+ Row(3, -1, "hr", "initial-text"),
+ Row(4, -1, "hr", "initial-text")))
+ }
+ }
+
test("SPARK-45974: merge into non filter attributes table") {
val tableName: String = "cat.ns1.non_partitioned_table"
withTable(tableName) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
index f659ca6329e2..d33ad2494c3c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
@@ -27,6 +27,43 @@ abstract class UpdateTableSuiteBase extends
RowLevelOperationSuiteBase {
import testImplicits._
+ test("update table containing added column with default value") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ sql(s"ALTER TABLE $tableNameAsString ADD COLUMN txt STRING DEFAULT
'initial-text'")
+
+ append("pk INT, salary INT, dep STRING, txt STRING",
+ """{ "pk": 3, "salary": 300, "dep": "hr", "txt": "explicit-text" }
+ |{ "pk": 4, "salary": 400, "dep": "software", "txt": "explicit-text" }
+ |{ "pk": 5, "salary": 500, "dep": "hr" }
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr", "initial-text"),
+ Row(2, 200, "software", "initial-text"),
+ Row(3, 300, "hr", "explicit-text"),
+ Row(4, 400, "software", "explicit-text"),
+ Row(5, 500, "hr", null)))
+
+ sql(s"ALTER TABLE $tableNameAsString ALTER COLUMN txt SET DEFAULT
'new-text'")
+
+ sql(s"UPDATE $tableNameAsString SET txt = DEFAULT WHERE pk IN (2, 8, 11)")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr", "initial-text"),
+ Row(2, 200, "software", "new-text"),
+ Row(3, 300, "hr", "explicit-text"),
+ Row(4, 400, "software", "explicit-text"),
+ Row(5, 500, "hr", null)))
+ }
+
test("EXPLAIN only update") {
createAndInitTable("pk INT NOT NULL, dep STRING", """{ "pk": 1, "dep":
"hr" }""")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]