This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 171e2ce7d117 [SPARK-50373] Prohibit Variant from set operations
171e2ce7d117 is described below
commit 171e2ce7d1176e48e29112fb8e4390bc5426a1d8
Author: Harsh Motwani <[email protected]>
AuthorDate: Tue Dec 3 02:15:57 2024 +0800
[SPARK-50373] Prohibit Variant from set operations
### What changes were proposed in this pull request?
Prior to this PR, Variant columns could be used with set operations like
`DISTINCT`, `INTERSECT` and `EXCEPT`. This PR prohibits this behavior since
Variant is not orderable.
### Why are the changes needed?
Variant equality is not defined, and therefore, these operations are also
undefined.
### Does this PR introduce _any_ user-facing change?
Yes, users will now no longer be able to perform set operations on variant
columns.
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48909 from harshmotw-db/harsh-motwani_data/variant_distinct_fix.
Authored-by: Harsh Motwani <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 5 ++
.../sql/catalyst/analysis/CheckAnalysis.scala | 21 ++++++
.../spark/sql/DataFrameSetOperationsSuite.scala | 78 ++++++++++++++++++++++
3 files changed, 104 insertions(+)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 024caf86cf94..632d43b4d105 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5360,6 +5360,11 @@
"Cannot have MAP type columns in DataFrame which calls set
operations (INTERSECT, EXCEPT, etc.), but the type of column <colName> is
<dataType>."
]
},
+ "SET_OPERATION_ON_VARIANT_TYPE" : {
+ "message" : [
+ "Cannot have VARIANT type columns in DataFrame which calls set
operations (INTERSECT, EXCEPT, etc.), but the type of column <colName> is
<dataType>."
+ ]
+ },
"SET_PROPERTIES_AND_DBPROPERTIES" : {
"message" : [
"set PROPERTIES and DBPROPERTIES at the same time."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 1c76fd7d00f7..c13da35334ba 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -76,6 +76,10 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog with QueryErrorsB
dt.existsRecursively(_.isInstanceOf[MapType])
}
+ protected def hasVariantType(dt: DataType): Boolean = {
+ dt.existsRecursively(_.isInstanceOf[VariantType])
+ }
+
protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute]
= plan match {
case _: Intersect | _: Except | _: Distinct =>
plan.output.find(a => hasMapType(a.dataType))
@@ -84,6 +88,14 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog with QueryErrorsB
case _ => None
}
+ protected def variantColumnInSetOperation(plan: LogicalPlan):
Option[Attribute] = plan match {
+ case _: Intersect | _: Except | _: Distinct =>
+ plan.output.find(a => hasVariantType(a.dataType))
+ case d: Deduplicate =>
+ d.keys.find(a => hasVariantType(a.dataType))
+ case _ => None
+ }
+
private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit
= {
limitExpr match {
case e if !e.foldable => limitExpr.failAnalysis(
@@ -828,6 +840,15 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog with QueryErrorsB
"colName" -> toSQLId(mapCol.name),
"dataType" -> toSQLType(mapCol.dataType)))
+ // TODO: Remove this type check once we support Variant ordering
+ case o if variantColumnInSetOperation(o).isDefined =>
+ val variantCol = variantColumnInSetOperation(o).get
+ o.failAnalysis(
+ errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+ messageParameters = Map(
+ "colName" -> toSQLId(variantCol.name),
+ "dataType" -> toSQLType(variantCol.dataType)))
+
case o if o.expressions.exists(!_.deterministic) &&
!operatorAllowsNonDeterministicExpressions(o) &&
!o.isInstanceOf[Project] &&
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
index 5ff737d2b57c..9c182be0f7dd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
@@ -350,6 +350,84 @@ class DataFrameSetOperationsSuite extends QueryTest
dates.intersect(widenTypedRows).collect()
}
+ test("SPARK-50373 - cannot run set operations with variant type") {
+ val df = sql("select parse_json(case when id = 0 then 'null' else '1'
end)" +
+ " as v, id % 5 as id from range(0, 100, 1, 5)")
+ checkError(
+ exception = intercept[AnalysisException](df.intersect(df)),
+ condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+ parameters = Map(
+ "colName" -> "`v`",
+ "dataType" -> "\"VARIANT\"")
+ )
+ checkError(
+ exception = intercept[AnalysisException](df.except(df)),
+ condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+ parameters = Map(
+ "colName" -> "`v`",
+ "dataType" -> "\"VARIANT\"")
+ )
+ checkError(
+ exception = intercept[AnalysisException](df.distinct()),
+ condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+ parameters = Map(
+ "colName" -> "`v`",
+ "dataType" -> "\"VARIANT\""))
+ checkError(
+ exception = intercept[AnalysisException](df.dropDuplicates()),
+ condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+ parameters = Map(
+ "colName" -> "`v`",
+ "dataType" -> "\"VARIANT\""))
+ withTempView("tv") {
+ df.createOrReplaceTempView("tv")
+ checkError(
+ exception = intercept[AnalysisException](sql("SELECT DISTINCT v FROM
tv")),
+ condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+ parameters = Map(
+ "colName" -> "`v`",
+ "dataType" -> "\"VARIANT\""),
+ context = ExpectedContext(
+ fragment = "SELECT DISTINCT v FROM tv",
+ start = 0,
+ stop = 24)
+ )
+ checkError(
+ exception = intercept[AnalysisException](sql("SELECT DISTINCT
STRUCT(v) FROM tv")),
+ condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+ parameters = Map(
+ "colName" -> "`struct(v)`",
+ "dataType" -> "\"STRUCT<v: VARIANT NOT NULL>\""),
+ context = ExpectedContext(
+ fragment = "SELECT DISTINCT STRUCT(v) FROM tv",
+ start = 0,
+ stop = 32)
+ )
+ checkError(
+ exception = intercept[AnalysisException](sql("SELECT DISTINCT ARRAY(v)
FROM tv")),
+ condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+ parameters = Map(
+ "colName" -> "`array(v)`",
+ "dataType" -> "\"ARRAY<VARIANT>\""),
+ context = ExpectedContext(
+ fragment = "SELECT DISTINCT ARRAY(v) FROM tv",
+ start = 0,
+ stop = 31)
+ )
+ checkError(
+ exception = intercept[AnalysisException](sql("SELECT DISTINCT MAP('m',
v) FROM tv")),
+ condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE",
+ parameters = Map(
+ "colName" -> "`map(m, v)`",
+ "dataType" -> "\"MAP<STRING, VARIANT>\""),
+ context = ExpectedContext(
+ fragment = "SELECT DISTINCT MAP('m', v) FROM tv",
+ start = 0,
+ stop = 34)
+ )
+ }
+ }
+
test("SPARK-19893: cannot run set operations with map type") {
val df = spark.range(1).select(map(lit("key"), $"id").as("m"))
checkError(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]