This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c6538301c0a8 [SPARK-47070][SQL][FOLLOW-UP] Add a flag guarding a
subquery in aggregate rewrite
c6538301c0a8 is described below
commit c6538301c0a8f82991acb01c684d15ba25b7c377
Author: Anton Lykov <[email protected]>
AuthorDate: Thu Mar 7 10:56:08 2024 +0900
[SPARK-47070][SQL][FOLLOW-UP] Add a flag guarding a subquery in aggregate
rewrite
### What changes were proposed in this pull request?
Add a flag that guards a recently introduced new codepath inside optimizer
that wraps `exists` variables into an agg function. See
[#45133](https://github.com/apache/spark/pull/45133) for details.
### Why are the changes needed?
Guarding a new QO codepath with a flag.
### Does this PR introduce _any_ user-facing change?
No; only a new internal conf.
### How was this patch tested?
No additional tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45412 from anton5798/agg-subquery-fup.
Lead-authored-by: Anton Lykov <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/sql/catalyst/optimizer/subquery.scala | 5 +++--
.../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++
2 files changed, 14 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 796c10b47012..c97e0aedf8c6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreePattern.{EXISTS_SUBQUERY,
IN_SUBQUERY, LATERAL_JOIN, LIST_SUBQUERY, PLAN_EXPRESSION, SCALAR_SUBQUERY}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
-import
org.apache.spark.sql.internal.SQLConf.{DECORRELATE_PREDICATE_SUBQUERIES_IN_JOIN_CONDITION,
OPTIMIZE_UNCORRELATED_IN_SUBQUERIES_IN_JOIN_CONDITION}
+import
org.apache.spark.sql.internal.SQLConf.{DECORRELATE_PREDICATE_SUBQUERIES_IN_JOIN_CONDITION,
OPTIMIZE_UNCORRELATED_IN_SUBQUERIES_IN_JOIN_CONDITION,
+ WRAP_EXISTS_IN_AGGREGATE_FUNCTION}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -257,7 +258,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan]
with PredicateHelper {
newExpr.get
}).withNewChildren(Seq(newChild))
updatedNode match {
- case a: Aggregate =>
+ case a: Aggregate if conf.getConf(WRAP_EXISTS_IN_AGGREGATE_FUNCTION) =>
// If we have introduced new `exists`-attributes that are referenced
by
// aggregateExpressions within a non-aggregateFunction expression,
we wrap them in
// first() aggregate function. first() is Spark's executable version
of any_value()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5903e783b967..ee20e12b9663 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3505,6 +3505,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val WRAP_EXISTS_IN_AGGREGATE_FUNCTION =
+ buildConf("spark.sql.optimizer.wrapExistsInAggregateFunction")
+ .internal()
+ .doc("When true, the optimizer will wrap newly introduced `exists`
attributes in an " +
+ "aggregate function to ensure that Aggregate nodes preserve semantic
invariant that each " +
+ "variable among agg expressions appears either in grouping expressions
or belongs to " +
+ "and aggregate function.")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(true)
+
val ALWAYS_INLINE_ONE_ROW_RELATION_SUBQUERY =
buildConf("spark.sql.optimizer.optimizeOneRowRelationSubquery.alwaysInline")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]