This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ec120fddcf64 [SPARK-51987][SPARK-52235][SQL][FOLLOW-UP] Add ANSI mode
check for new tests
ec120fddcf64 is described below
commit ec120fddcf64068d0a28aa801e496f68618a6bc8
Author: Szehon Ho <[email protected]>
AuthorDate: Fri Jun 6 09:34:18 2025 -0700
[SPARK-51987][SPARK-52235][SQL][FOLLOW-UP] Add ANSI mode check for new tests
### What changes were proposed in this pull request?
Fix non-ANSI test breakage as per
https://github.com/apache/spark/pull/50959#pullrequestreview-2890552215
### Why are the changes needed?
Many V2 Expressions are only converted successfully in ANSI mode, so this
test for V2 Expression only makes sense in that mode.
### Does this PR introduce _any_ user-facing change?
No, test only
### How was this patch tested?
Run test in NON-ANSI mode
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #51092 from szehon-ho/SPARK-52235-follow.
Lead-authored-by: Szehon Ho <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/connector/DataSourceV2DataFrameSuite.scala | 407 ++++++++++-----------
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 5 +-
.../spark/sql/connector/DatasourceV2SQLBase.scala | 4 +-
3 files changed, 207 insertions(+), 209 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index a7bfcfcfea62..342eefa1a6f6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector
import java.util.Collections
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
@@ -40,14 +40,13 @@ class DataSourceV2DataFrameSuite
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import testImplicits._
- before {
- spark.conf.set("spark.sql.catalog.testcat",
classOf[InMemoryTableCatalog].getName)
- spark.conf.set("spark.sql.catalog.testcat2",
classOf[InMemoryTableCatalog].getName)
- }
+ override protected def sparkConf: SparkConf = super.sparkConf
+ .set(SQLConf.ANSI_ENABLED, true)
+ .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
+ .set("spark.sql.catalog.testcat2", classOf[InMemoryTableCatalog].getName)
after {
spark.sessionState.catalogManager.reset()
- spark.sessionState.conf.clear()
}
override protected val catalogAndNamespace: String = "testcat.ns1.ns2.tbls"
@@ -352,185 +351,180 @@ class DataSourceV2DataFrameSuite
test("create/replace table with complex foldable default values") {
val tableName = "testcat.ns1.ns2.tbl"
withTable(tableName) {
- withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
- val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
- sql(
- s"""
- |CREATE TABLE $tableName (
- | id INT,
- | salary INT DEFAULT (100 + 23),
- | dep STRING DEFAULT ('h' || 'r'),
- | active BOOLEAN DEFAULT CAST(1 AS BOOLEAN)
- |) USING foo
- |""".stripMargin)
- }
+ val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id INT,
+ | salary INT DEFAULT (100 + 23),
+ | dep STRING DEFAULT ('h' || 'r'),
+ | active BOOLEAN DEFAULT CAST(1 AS BOOLEAN)
+ |) USING foo
+ |""".stripMargin)
+ }
- checkDefaultValues(
- createExec.columns,
- Array(
- null,
- new ColumnDefaultValue(
- "(100 + 23)",
- new GeneralScalarExpression(
- "+",
- Array(LiteralValue(100, IntegerType), LiteralValue(23,
IntegerType))),
- LiteralValue(123, IntegerType)),
- new ColumnDefaultValue(
- "('h' || 'r')",
- new GeneralScalarExpression(
- "CONCAT",
- Array(
- LiteralValue(UTF8String.fromString("h"), StringType),
- LiteralValue(UTF8String.fromString("r"), StringType))),
- LiteralValue(UTF8String.fromString("hr"), StringType)),
- new ColumnDefaultValue(
- "CAST(1 AS BOOLEAN)",
- new V2Cast(LiteralValue(1, IntegerType), IntegerType,
BooleanType),
- LiteralValue(true, BooleanType))))
+ checkDefaultValues(
+ createExec.columns,
+ Array(
+ null,
+ new ColumnDefaultValue(
+ "(100 + 23)",
+ new GeneralScalarExpression(
+ "+",
+ Array(LiteralValue(100, IntegerType), LiteralValue(23,
IntegerType))),
+ LiteralValue(123, IntegerType)),
+ new ColumnDefaultValue(
+ "('h' || 'r')",
+ new GeneralScalarExpression(
+ "CONCAT",
+ Array(
+ LiteralValue(UTF8String.fromString("h"), StringType),
+ LiteralValue(UTF8String.fromString("r"), StringType))),
+ LiteralValue(UTF8String.fromString("hr"), StringType)),
+ new ColumnDefaultValue(
+ "CAST(1 AS BOOLEAN)",
+ new V2Cast(LiteralValue(1, IntegerType), IntegerType, BooleanType),
+ LiteralValue(true, BooleanType))))
- val df1 = Seq(1).toDF("id")
- df1.writeTo(tableName).append()
+ val df1 = Seq(1).toDF("id")
+ df1.writeTo(tableName).append()
- sql(s"ALTER TABLE $tableName ALTER COLUMN dep SET DEFAULT ('i' ||
't')")
+ sql(s"ALTER TABLE $tableName ALTER COLUMN dep SET DEFAULT ('i' || 't')")
- val df2 = Seq(2).toDF("id")
- df2.writeTo(tableName).append()
+ val df2 = Seq(2).toDF("id")
+ df2.writeTo(tableName).append()
- checkAnswer(
- sql(s"SELECT * FROM $tableName"),
- Seq(
- Row(1, 123, "hr", true),
- Row(2, 123, "it", true)))
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName"),
+ Seq(
+ Row(1, 123, "hr", true),
+ Row(2, 123, "it", true)))
- val replaceExec = executeAndKeepPhysicalPlan[ReplaceTableExec] {
- sql(
- s"""
- |REPLACE TABLE $tableName (
- | id INT,
- | salary INT DEFAULT (50 * 2),
- | dep STRING DEFAULT ('un' || 'known'),
- | active BOOLEAN DEFAULT CAST(0 AS BOOLEAN)
- |) USING foo
- |""".stripMargin)
- }
+ val replaceExec = executeAndKeepPhysicalPlan[ReplaceTableExec] {
+ sql(
+ s"""
+ |REPLACE TABLE $tableName (
+ | id INT,
+ | salary INT DEFAULT (50 * 2),
+ | dep STRING DEFAULT ('un' || 'known'),
+ | active BOOLEAN DEFAULT CAST(0 AS BOOLEAN)
+ |) USING foo
+ |""".stripMargin)
+ }
- checkDefaultValues(
- replaceExec.columns,
- Array(
- null,
- new ColumnDefaultValue(
- "(50 * 2)",
- new GeneralScalarExpression(
- "*",
- Array(LiteralValue(50, IntegerType), LiteralValue(2,
IntegerType))),
- LiteralValue(100, IntegerType)),
- new ColumnDefaultValue(
- "('un' || 'known')",
- new GeneralScalarExpression(
- "CONCAT",
- Array(
- LiteralValue(UTF8String.fromString("un"), StringType),
- LiteralValue(UTF8String.fromString("known"), StringType))),
- LiteralValue(UTF8String.fromString("unknown"), StringType)),
- new ColumnDefaultValue(
- "CAST(0 AS BOOLEAN)",
- new V2Cast(LiteralValue(0, IntegerType), IntegerType,
BooleanType),
- LiteralValue(false, BooleanType))))
+ checkDefaultValues(
+ replaceExec.columns,
+ Array(
+ null,
+ new ColumnDefaultValue(
+ "(50 * 2)",
+ new GeneralScalarExpression(
+ "*",
+ Array(LiteralValue(50, IntegerType), LiteralValue(2,
IntegerType))),
+ LiteralValue(100, IntegerType)),
+ new ColumnDefaultValue(
+ "('un' || 'known')",
+ new GeneralScalarExpression(
+ "CONCAT",
+ Array(
+ LiteralValue(UTF8String.fromString("un"), StringType),
+ LiteralValue(UTF8String.fromString("known"), StringType))),
+ LiteralValue(UTF8String.fromString("unknown"), StringType)),
+ new ColumnDefaultValue(
+ "CAST(0 AS BOOLEAN)",
+ new V2Cast(LiteralValue(0, IntegerType), IntegerType, BooleanType),
+ LiteralValue(false, BooleanType))))
- val df3 = Seq(1).toDF("id")
- df3.writeTo(tableName).append()
+ val df3 = Seq(1).toDF("id")
+ df3.writeTo(tableName).append()
- checkAnswer(
- sql(s"SELECT * FROM $tableName"),
- Seq(Row(1, 100, "unknown", false)))
- }
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName"),
+ Seq(Row(1, 100, "unknown", false)))
}
}
+
test("alter table add column with complex foldable default values") {
val tableName = "testcat.ns1.ns2.tbl"
- withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
- withTable(tableName) {
- sql(
- s"""
- |CREATE TABLE $tableName (
- | dummy INT
- |) USING foo
- |""".stripMargin)
-
- val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] {
- sql(s"ALTER TABLE $tableName ADD COLUMNS (" +
- s"salary INT DEFAULT (100 + 23), " +
- s"dep STRING DEFAULT ('h' || 'r'), " +
- s"active BOOLEAN DEFAULT CAST(1 AS BOOLEAN))")
- }
+ withTable(tableName) {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | dummy INT
+ |) USING foo
+ |""".stripMargin)
- checkDefaultValues(
- alterExec.changes.map(_.asInstanceOf[AddColumn]).toArray,
- Array(
- new ColumnDefaultValue(
- "(100 + 23)",
- new GeneralScalarExpression(
- "+",
- Array(LiteralValue(100, IntegerType), LiteralValue(23,
IntegerType))),
- LiteralValue(123, IntegerType)),
- new ColumnDefaultValue(
- "('h' || 'r')",
- new GeneralScalarExpression(
- "CONCAT",
- Array(
- LiteralValue(UTF8String.fromString("h"), StringType),
- LiteralValue(UTF8String.fromString("r"), StringType))),
- LiteralValue(UTF8String.fromString("hr"), StringType)),
- new ColumnDefaultValue(
- "CAST(1 AS BOOLEAN)",
- new V2Cast(LiteralValue(1, IntegerType), IntegerType,
BooleanType),
- LiteralValue(true, BooleanType))))
+ val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] {
+ sql(s"ALTER TABLE $tableName ADD COLUMNS (" +
+ s"salary INT DEFAULT (100 + 23), " +
+ s"dep STRING DEFAULT ('h' || 'r'), " +
+ s"active BOOLEAN DEFAULT CAST(1 AS BOOLEAN))")
}
+
+ checkDefaultValues(
+ alterExec.changes.map(_.asInstanceOf[AddColumn]).toArray,
+ Array(
+ new ColumnDefaultValue(
+ "(100 + 23)",
+ new GeneralScalarExpression(
+ "+",
+ Array(LiteralValue(100, IntegerType), LiteralValue(23,
IntegerType))),
+ LiteralValue(123, IntegerType)),
+ new ColumnDefaultValue(
+ "('h' || 'r')",
+ new GeneralScalarExpression(
+ "CONCAT",
+ Array(
+ LiteralValue(UTF8String.fromString("h"), StringType),
+ LiteralValue(UTF8String.fromString("r"), StringType))),
+ LiteralValue(UTF8String.fromString("hr"), StringType)),
+ new ColumnDefaultValue(
+ "CAST(1 AS BOOLEAN)",
+ new V2Cast(LiteralValue(1, IntegerType), IntegerType, BooleanType),
+ LiteralValue(true, BooleanType))))
}
}
test("alter table alter column with complex foldable default values") {
val tableName = "testcat.ns1.ns2.tbl"
- withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
- withTable(tableName) {
- sql(
- s"""
- |CREATE TABLE $tableName (
- | salary INT DEFAULT (100 + 23),
- | dep STRING DEFAULT ('h' || 'r'),
- | active BOOLEAN DEFAULT CAST(1 AS BOOLEAN)
- |) USING foo
- |""".stripMargin)
+ withTable(tableName) {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | salary INT DEFAULT (100 + 23),
+ | dep STRING DEFAULT ('h' || 'r'),
+ | active BOOLEAN DEFAULT CAST(1 AS BOOLEAN)
+ |) USING foo
+ |""".stripMargin)
- val alterExecCol1 = executeAndKeepPhysicalPlan[AlterTableExec] {
- sql(
- s"""
- |ALTER TABLE $tableName ALTER COLUMN
- | salary SET DEFAULT (123 + 56),
- | dep SET DEFAULT ('r' || 'l'),
- | active SET DEFAULT CAST(0 AS BOOLEAN)
- |""".stripMargin)
- }
- checkDefaultValues(
-
alterExecCol1.changes.map(_.asInstanceOf[UpdateColumnDefaultValue]).toArray,
- Array(
- new DefaultValue(
- "(123 + 56)",
- new GeneralScalarExpression(
- "+",
- Array(LiteralValue(123, IntegerType), LiteralValue(56,
IntegerType)))),
- new DefaultValue(
- "('r' || 'l')",
- new GeneralScalarExpression(
- "CONCAT",
- Array(
- LiteralValue(UTF8String.fromString("r"), StringType),
- LiteralValue(UTF8String.fromString("l"), StringType)))),
- new DefaultValue(
- "CAST(0 AS BOOLEAN)",
- new V2Cast(LiteralValue(0, IntegerType), IntegerType,
BooleanType))))
+ val alterExecCol1 = executeAndKeepPhysicalPlan[AlterTableExec] {
+ sql(
+ s"""
+ |ALTER TABLE $tableName ALTER COLUMN
+ | salary SET DEFAULT (123 + 56),
+ | dep SET DEFAULT ('r' || 'l'),
+ | active SET DEFAULT CAST(0 AS BOOLEAN)
+ |""".stripMargin)
}
+ checkDefaultValues(
+
alterExecCol1.changes.map(_.asInstanceOf[UpdateColumnDefaultValue]).toArray,
+ Array(
+ new DefaultValue(
+ "(123 + 56)",
+ new GeneralScalarExpression(
+ "+",
+ Array(LiteralValue(123, IntegerType), LiteralValue(56,
IntegerType)))),
+ new DefaultValue(
+ "('r' || 'l')",
+ new GeneralScalarExpression(
+ "CONCAT",
+ Array(
+ LiteralValue(UTF8String.fromString("r"), StringType),
+ LiteralValue(UTF8String.fromString("l"), StringType)))),
+ new DefaultValue(
+ "CAST(0 AS BOOLEAN)",
+ new V2Cast(LiteralValue(0, IntegerType), IntegerType,
BooleanType))))
}
}
@@ -677,7 +671,6 @@ class DataSourceV2DataFrameSuite
test("create/replace table default value expression should have a cast") {
val tableName = "testcat.ns1.ns2.tbl"
withTable(tableName) {
-
val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
sql(
s"""
@@ -693,8 +686,8 @@ class DataSourceV2DataFrameSuite
null,
new ColumnDefaultValue(
"'2018-11-17 13:33:33'",
- new LiteralValue(1542490413000000L, TimestampType),
- new LiteralValue(1542490413000000L, TimestampType)),
+ LiteralValue(1542490413000000L, TimestampType),
+ LiteralValue(1542490413000000L, TimestampType)),
new ColumnDefaultValue(
"1",
new V2Cast(LiteralValue(1, IntegerType), IntegerType, DoubleType),
@@ -728,53 +721,53 @@ class DataSourceV2DataFrameSuite
}
}
+
test("alter table default value expression should have a cast") {
val tableName = "testcat.ns1.ns2.tbl"
- withTable(tableName) {
-
- sql(s"CREATE TABLE $tableName (col1 int) using foo")
- val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] {
- sql(
- s"""
- |ALTER TABLE $tableName ADD COLUMNS (
- | col2 timestamp DEFAULT '2018-11-17 13:33:33',
- | col3 double DEFAULT 1)
- |""".stripMargin)
- }
+ withTable(tableName) {
+ sql(s"CREATE TABLE $tableName (col1 int) using foo")
+ val alterExec = executeAndKeepPhysicalPlan[AlterTableExec] {
+ sql(
+ s"""
+ |ALTER TABLE $tableName ADD COLUMNS (
+ | col2 timestamp DEFAULT '2018-11-17 13:33:33',
+ | col3 double DEFAULT 1)
+ |""".stripMargin)
+ }
- checkDefaultValues(
- alterExec.changes.map(_.asInstanceOf[AddColumn]).toArray,
- Array(
- new ColumnDefaultValue(
- "'2018-11-17 13:33:33'",
- LiteralValue(1542490413000000L, TimestampType),
- LiteralValue(1542490413000000L, TimestampType)),
- new ColumnDefaultValue(
- "1",
- new V2Cast(LiteralValue(1, IntegerType), IntegerType, DoubleType),
- LiteralValue(1.0, DoubleType))))
+ checkDefaultValues(
+ alterExec.changes.map(_.asInstanceOf[AddColumn]).toArray,
+ Array(
+ new ColumnDefaultValue(
+ "'2018-11-17 13:33:33'",
+ LiteralValue(1542490413000000L, TimestampType),
+ LiteralValue(1542490413000000L, TimestampType)),
+ new ColumnDefaultValue(
+ "1",
+ new V2Cast(LiteralValue(1, IntegerType), IntegerType,
DoubleType),
+ LiteralValue(1.0, DoubleType))))
- val alterCol1 = executeAndKeepPhysicalPlan[AlterTableExec] {
- sql(
- s"""
- |ALTER TABLE $tableName ALTER COLUMN
- | col2 SET DEFAULT '2022-02-23 05:55:55',
- | col3 SET DEFAULT (1 + 1)
- |""".stripMargin)
+ val alterCol1 = executeAndKeepPhysicalPlan[AlterTableExec] {
+ sql(
+ s"""
+ |ALTER TABLE $tableName ALTER COLUMN
+ | col2 SET DEFAULT '2022-02-23 05:55:55',
+ | col3 SET DEFAULT (1 + 1)
+ |""".stripMargin)
+ }
+ checkDefaultValues(
+
alterCol1.changes.map(_.asInstanceOf[UpdateColumnDefaultValue]).toArray,
+ Array(
+ new DefaultValue("'2022-02-23 05:55:55'",
+ LiteralValue(1645624555000000L, TimestampType)),
+ new DefaultValue(
+ "(1 + 1)",
+ new V2Cast(
+ new GeneralScalarExpression("+", Array(LiteralValue(1,
IntegerType),
+ LiteralValue(1, IntegerType))),
+ IntegerType,
+ DoubleType))))
}
- checkDefaultValues(
-
alterCol1.changes.map(_.asInstanceOf[UpdateColumnDefaultValue]).toArray,
- Array(
- new DefaultValue("'2022-02-23 05:55:55'",
- LiteralValue(1645624555000000L, TimestampType)),
- new DefaultValue(
- "(1 + 1)",
- new V2Cast(
- new GeneralScalarExpression("+", Array(LiteralValue(1,
IntegerType),
- LiteralValue(1, IntegerType))),
- IntegerType,
- DoubleType))))
- }
}
test("write with supported expression-based default values") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 48ccf33faaa1..cd4cd462088a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -25,7 +25,7 @@ import java.util.Locale
import scala.concurrent.duration.MICROSECONDS
import scala.jdk.CollectionConverters._
-import org.apache.spark.{SparkException, SparkRuntimeException,
SparkUnsupportedOperationException}
+import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException,
SparkUnsupportedOperationException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, QualifiedTableName,
TableIdentifier}
import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
@@ -57,6 +57,9 @@ abstract class DataSourceV2SQLSuite
with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase
with AdaptiveSparkPlanHelper {
+ override protected def sparkConf: SparkConf =
+ super.sparkConf.set(SQLConf.ANSI_ENABLED, true)
+
protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName
override protected val v2Format = v2Source
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DatasourceV2SQLBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DatasourceV2SQLBase.scala
index 4ccff44fa067..683d814d28b4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DatasourceV2SQLBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DatasourceV2SQLBase.scala
@@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, InMemoryCatalog,
InMemoryPartitionTableCatalog, InMemoryTableWithV2FilterCatalog,
StagingInMemoryTableCatalog}
import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
trait DatasourceV2SQLBase
@@ -52,6 +53,7 @@ trait DatasourceV2SQLBase
after {
spark.sessionState.catalog.reset()
spark.sessionState.catalogManager.reset()
- spark.sessionState.conf.clear()
+
spark.sessionState.conf.unsetConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION)
+ spark.sessionState.conf.unsetConf(SQLConf.DEFAULT_CATALOG)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]