This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2e379be018d7 [SPARK-52455][SQL][TESTS] Test expression-based defaults
in UPDATE and MERGE
2e379be018d7 is described below
commit 2e379be018d7ac50029268d15270c7a043268ac8
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Sat Jun 14 11:42:07 2025 -0700
[SPARK-52455][SQL][TESTS] Test expression-based defaults in UPDATE and MERGE
### What changes were proposed in this pull request?
This PR adds test for expression-based defaults in UPDATE and MERGE.
### Why are the changes needed?
To complete default value support.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This PR comes with tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51161 from aokolnychyi/spark-52455.
Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/connector/MergeIntoTableSuiteBase.scala | 49 ++++++++++++++++++-
.../spark/sql/connector/UpdateTableSuiteBase.scala | 57 +++++++++++++++++++++-
2 files changed, 102 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index b43424793d44..137c62a8112a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -21,8 +21,8 @@ import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
In, Not}
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
-import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue}
-import org.apache.spark.sql.connector.expressions.LiteralValue
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue,
TableInfo}
+import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression,
LiteralValue}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
BroadcastNestedLoopJoinExec, CartesianProductExec}
import org.apache.spark.sql.internal.SQLConf
@@ -32,6 +32,51 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase {
import testImplicits._
+ test("merge into table with expression-based default values") {
+ val columns = Array(
+ Column.create("pk", IntegerType),
+ Column.create("salary", IntegerType),
+ Column.create("dep", StringType),
+ Column.create(
+ "value",
+ IntegerType,
+ false, /* not nullable */
+ null, /* no comment */
+ new ColumnDefaultValue(
+ new GeneralScalarExpression(
+ "+",
+ Array(LiteralValue(100, IntegerType), LiteralValue(23,
IntegerType))),
+ LiteralValue(123, IntegerType)),
+ "{}"))
+ val tableInfo = new TableInfo.Builder().withColumns(columns).build()
+ catalog.createTable(ident, tableInfo)
+
+ withTempView("source") {
+ val sourceRows = Seq(
+ (1, 500, "eng"),
+ (2, 600, "hr"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ sql(s"INSERT INTO $tableNameAsString (pk, salary, dep, value) VALUES (1,
200, 'eng', 999)")
+
+ sql(
+ s"""MERGE INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET value = DEFAULT
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, salary, dep) VALUES (s.pk, s.salary, s.dep)
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 200, "eng", 123), // update
+ Row(2, 600, "hr", 123))) // insert
+ }
+ }
+
test("merge into table containing added column with default value") {
withTempView("source") {
sql(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
index 0c3ed5106eba..c8c14876fcee 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.connector
import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.Row
-import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue}
-import org.apache.spark.sql.connector.expressions.LiteralValue
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue,
TableChange, TableInfo}
+import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression,
LiteralValue}
import org.apache.spark.sql.types.{IntegerType, StringType}
abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase {
@@ -64,6 +64,59 @@ abstract class UpdateTableSuiteBase extends
RowLevelOperationSuiteBase {
Row(5, 500, "hr", null)))
}
+ test("update table with expression-based default values") {
+ val columns = Array(
+ Column.create("pk", IntegerType),
+ Column.create("salary", IntegerType),
+ Column.create("dep", StringType))
+ val tableInfo = new TableInfo.Builder().withColumns(columns).build()
+ catalog.createTable(ident, tableInfo)
+
+ append("pk INT, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val addColumn = TableChange.addColumn(
+ Array("value"),
+ IntegerType,
+ false, /* not nullable */
+ null, /* no comment */
+ null, /* no position */
+ new ColumnDefaultValue(
+ new GeneralScalarExpression(
+ "+",
+ Array(LiteralValue(100, IntegerType), LiteralValue(23,
IntegerType))),
+ LiteralValue(123, IntegerType)))
+ catalog.alterTable(ident, addColumn)
+
+ append("pk INT, salary INT, dep STRING, value INT",
+ """{ "pk": 4, "salary": 400, "dep": "hr", "value": -4 }
+ |{ "pk": 5, "salary": 500, "dep": "hr", "value": -5 }
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr", 123),
+ Row(2, 200, "software", 123),
+ Row(3, 300, "hr", 123),
+ Row(4, 400, "hr", -4),
+ Row(5, 500, "hr", -5)))
+
+ sql(s"UPDATE $tableNameAsString SET value = DEFAULT WHERE pk >= 5")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr", 123),
+ Row(2, 200, "software", 123),
+ Row(3, 300, "hr", 123),
+ Row(4, 400, "hr", -4),
+ Row(5, 500, "hr", 123)))
+ }
+
test("EXPLAIN only update") {
createAndInitTable("pk INT NOT NULL, dep STRING", """{ "pk": 1, "dep":
"hr" }""")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]