This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c6538301c0a8 [SPARK-47070][SQL][FOLLOW-UP] Add a flag guarding a 
subquery in aggregate rewrite
c6538301c0a8 is described below

commit c6538301c0a8f82991acb01c684d15ba25b7c377
Author: Anton Lykov <[email protected]>
AuthorDate: Thu Mar 7 10:56:08 2024 +0900

    [SPARK-47070][SQL][FOLLOW-UP] Add a flag guarding a subquery in aggregate 
rewrite
    
    ### What changes were proposed in this pull request?
    
    Add a flag that guards a recently introduced new codepath inside optimizer 
that wraps `exists` variables into an agg function. See 
[#45133](https://github.com/apache/spark/pull/45133) for details.
    
    ### Why are the changes needed?
    
    Guarding a new QO codepath with a flag.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No; only a new internal conf.
    
    ### How was this patch tested?
    
    No additional tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45412 from anton5798/agg-subquery-fup.
    
    Lead-authored-by: Anton Lykov <[email protected]>
    Co-authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/sql/catalyst/optimizer/subquery.scala    |  5 +++--
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala    | 11 +++++++++++
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 796c10b47012..c97e0aedf8c6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.trees.TreePattern.{EXISTS_SUBQUERY, 
IN_SUBQUERY, LATERAL_JOIN, LIST_SUBQUERY, PLAN_EXPRESSION, SCALAR_SUBQUERY}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.internal.SQLConf
-import 
org.apache.spark.sql.internal.SQLConf.{DECORRELATE_PREDICATE_SUBQUERIES_IN_JOIN_CONDITION,
 OPTIMIZE_UNCORRELATED_IN_SUBQUERIES_IN_JOIN_CONDITION}
+import 
org.apache.spark.sql.internal.SQLConf.{DECORRELATE_PREDICATE_SUBQUERIES_IN_JOIN_CONDITION,
 OPTIMIZE_UNCORRELATED_IN_SUBQUERIES_IN_JOIN_CONDITION,
+  WRAP_EXISTS_IN_AGGREGATE_FUNCTION}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -257,7 +258,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] 
with PredicateHelper {
         newExpr.get
       }).withNewChildren(Seq(newChild))
       updatedNode match {
-        case a: Aggregate =>
+        case a: Aggregate if conf.getConf(WRAP_EXISTS_IN_AGGREGATE_FUNCTION) =>
           // If we have introduced new `exists`-attributes that are referenced 
by
           // aggregateExpressions within a non-aggregateFunction expression, 
we wrap them in
           // first() aggregate function. first() is Spark's executable version 
of any_value()
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5903e783b967..ee20e12b9663 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3505,6 +3505,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val WRAP_EXISTS_IN_AGGREGATE_FUNCTION =
+    buildConf("spark.sql.optimizer.wrapExistsInAggregateFunction")
+      .internal()
+      .doc("When true, the optimizer will wrap newly introduced `exists` 
attributes in an " +
+      "aggregate function to ensure that Aggregate nodes preserve semantic 
invariant that each " +
+      "variable among agg expressions appears either in grouping expressions 
or belongs to " +
+      "and aggregate function.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val ALWAYS_INLINE_ONE_ROW_RELATION_SUBQUERY =
     
buildConf("spark.sql.optimizer.optimizeOneRowRelationSubquery.alwaysInline")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to