This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 171e2ce7d117 [SPARK-50373] Prohibit Variant from set operations
171e2ce7d117 is described below

commit 171e2ce7d1176e48e29112fb8e4390bc5426a1d8
Author: Harsh Motwani <[email protected]>
AuthorDate: Tue Dec 3 02:15:57 2024 +0800

    [SPARK-50373] Prohibit Variant from set operations
    
    ### What changes were proposed in this pull request?
    
    Prior to this PR, Variant columns could be used with set operations like 
`DISTINCT`, `INTERSECT` and `EXCEPT`. This PR prohibits this behavior since 
Variant is not orderable.
    
    ### Why are the changes needed?
    
    Variant equality is not defined, and therefore, these operations are also 
undefined.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, users will now no longer be able to perform set operations on variant 
columns.
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48909 from harshmotw-db/harsh-motwani_data/variant_distinct_fix.
    
    Authored-by: Harsh Motwani <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  5 ++
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 21 ++++++
 .../spark/sql/DataFrameSetOperationsSuite.scala    | 78 ++++++++++++++++++++++
 3 files changed, 104 insertions(+)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 024caf86cf94..632d43b4d105 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5360,6 +5360,11 @@
           "Cannot have MAP type columns in DataFrame which calls set 
operations (INTERSECT, EXCEPT, etc.), but the type of column <colName> is 
<dataType>."
         ]
       },
+      "SET_OPERATION_ON_VARIANT_TYPE" : {
+        "message" : [
+          "Cannot have VARIANT type columns in DataFrame which calls set 
operations (INTERSECT, EXCEPT, etc.), but the type of column <colName> is 
<dataType>."
+        ]
+      },
       "SET_PROPERTIES_AND_DBPROPERTIES" : {
         "message" : [
           "set PROPERTIES and DBPROPERTIES at the same time."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 1c76fd7d00f7..c13da35334ba 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -76,6 +76,10 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
     dt.existsRecursively(_.isInstanceOf[MapType])
   }
 
+  protected def hasVariantType(dt: DataType): Boolean = {
+    dt.existsRecursively(_.isInstanceOf[VariantType])
+  }
+
   protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] 
= plan match {
     case _: Intersect | _: Except | _: Distinct =>
       plan.output.find(a => hasMapType(a.dataType))
@@ -84,6 +88,14 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
     case _ => None
   }
 
+  protected def variantColumnInSetOperation(plan: LogicalPlan): 
Option[Attribute] = plan match {
+    case _: Intersect | _: Except | _: Distinct =>
+      plan.output.find(a => hasVariantType(a.dataType))
+    case d: Deduplicate =>
+      d.keys.find(a => hasVariantType(a.dataType))
+    case _ => None
+  }
+
   private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit 
= {
     limitExpr match {
       case e if !e.foldable => limitExpr.failAnalysis(
@@ -828,6 +840,15 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
                 "colName" -> toSQLId(mapCol.name),
                 "dataType" -> toSQLType(mapCol.dataType)))
 
+          // TODO: Remove this type check once we support Variant ordering
+          case o if variantColumnInSetOperation(o).isDefined =>
+            val variantCol = variantColumnInSetOperation(o).get
+            o.failAnalysis(
+              errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+              messageParameters = Map(
+                "colName" -> toSQLId(variantCol.name),
+                "dataType" -> toSQLType(variantCol.dataType)))
+
           case o if o.expressions.exists(!_.deterministic) &&
             !operatorAllowsNonDeterministicExpressions(o) &&
             !o.isInstanceOf[Project] &&
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
index 5ff737d2b57c..9c182be0f7dd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
@@ -350,6 +350,84 @@ class DataFrameSetOperationsSuite extends QueryTest
     dates.intersect(widenTypedRows).collect()
   }
 
+  test("SPARK-50373 - cannot run set operations with variant type") {
+    val df = sql("select parse_json(case when id = 0 then 'null' else '1' 
end)" +
+      " as v, id % 5 as id from range(0, 100, 1, 5)")
+    checkError(
+      exception = intercept[AnalysisException](df.intersect(df)),
+      condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+      parameters = Map(
+        "colName" -> "`v`",
+        "dataType" -> "\"VARIANT\"")
+    )
+    checkError(
+      exception = intercept[AnalysisException](df.except(df)),
+      condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+      parameters = Map(
+        "colName" -> "`v`",
+        "dataType" -> "\"VARIANT\"")
+    )
+    checkError(
+      exception = intercept[AnalysisException](df.distinct()),
+      condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+      parameters = Map(
+        "colName" -> "`v`",
+        "dataType" -> "\"VARIANT\""))
+    checkError(
+      exception = intercept[AnalysisException](df.dropDuplicates()),
+      condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+      parameters = Map(
+        "colName" -> "`v`",
+        "dataType" -> "\"VARIANT\""))
+    withTempView("tv") {
+      df.createOrReplaceTempView("tv")
+      checkError(
+        exception = intercept[AnalysisException](sql("SELECT DISTINCT v FROM 
tv")),
+        condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+        parameters = Map(
+          "colName" -> "`v`",
+          "dataType" -> "\"VARIANT\""),
+        context = ExpectedContext(
+          fragment = "SELECT DISTINCT v FROM tv",
+          start = 0,
+          stop = 24)
+      )
+      checkError(
+        exception = intercept[AnalysisException](sql("SELECT DISTINCT 
STRUCT(v) FROM tv")),
+        condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+        parameters = Map(
+          "colName" -> "`struct(v)`",
+          "dataType" -> "\"STRUCT<v: VARIANT NOT NULL>\""),
+        context = ExpectedContext(
+          fragment = "SELECT DISTINCT STRUCT(v) FROM tv",
+          start = 0,
+          stop = 32)
+      )
+      checkError(
+        exception = intercept[AnalysisException](sql("SELECT DISTINCT ARRAY(v) 
FROM tv")),
+        condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
+        parameters = Map(
+          "colName" -> "`array(v)`",
+          "dataType" -> "\"ARRAY<VARIANT>\""),
+        context = ExpectedContext(
+          fragment = "SELECT DISTINCT ARRAY(v) FROM tv",
+          start = 0,
+          stop = 31)
+      )
+      checkError(
+        exception = intercept[AnalysisException](sql("SELECT DISTINCT MAP('m', 
v) FROM tv")),
+        condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE",
+        parameters = Map(
+          "colName" -> "`map(m, v)`",
+          "dataType" -> "\"MAP<STRING, VARIANT>\""),
+        context = ExpectedContext(
+          fragment = "SELECT DISTINCT MAP('m', v) FROM tv",
+          start = 0,
+          stop = 34)
+      )
+    }
+  }
+
   test("SPARK-19893: cannot run set operations with map type") {
     val df = spark.range(1).select(map(lit("key"), $"id").as("m"))
     checkError(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to