This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 892fdc53269 [SPARK-45108][SQL] Improve the InjectRuntimeFilter for
check probably shuffle
892fdc53269 is described below
commit 892fdc532696e703b353c4758320d69162fffe8c
Author: Jiaan Geng <[email protected]>
AuthorDate: Thu Sep 21 19:52:40 2023 +0800
[SPARK-45108][SQL] Improve the InjectRuntimeFilter for check probably
shuffle
### What changes were proposed in this pull request?
`InjectRuntimeFilter` needs to check probably shuffle. But the current code
may lead to duplicate call of `isProbablyShuffleJoin` if we need the right side
of `Join` node as the application side.
### Why are the changes needed?
To avoid the duplicate call of `isProbablyShuffleJoin`.
### Does this PR introduce _any_ user-facing change?
'No'.
Just update the inner implementation.
### How was this patch tested?
Exists test cases.
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #42861 from beliefer/SPARK-45108.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/optimizer/InjectRuntimeFilter.scala | 20 +++++++++++---------
1 file changed, 11 insertions(+), 9 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 44c55860375..13554908379 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -229,19 +229,15 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
* - The filterApplicationSideJoinExp can be pushed down through joins,
aggregates and windows
* (ie the expression references originate from a single leaf node)
* - The filter creation side has a selective predicate
- * - The current join is a shuffle join or a broadcast join that has a
shuffle below it
* - The max filterApplicationSide scan size is greater than a configurable
threshold
*/
private def extractBeneficialFilterCreatePlan(
filterApplicationSide: LogicalPlan,
filterCreationSide: LogicalPlan,
filterApplicationSideExp: Expression,
- filterCreationSideExp: Expression,
- hint: JoinHint): Option[LogicalPlan] = {
+ filterCreationSideExp: Expression): Option[LogicalPlan] = {
if (findExpressionAndTrackLineageDown(
filterApplicationSideExp, filterApplicationSide).isDefined &&
- (isProbablyShuffleJoin(filterApplicationSide, filterCreationSide, hint)
||
- probablyHasShuffle(filterApplicationSide)) &&
satisfyByteSizeRequirement(filterApplicationSide)) {
extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideExp)
} else {
@@ -326,15 +322,21 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
isSimpleExpression(l) && isSimpleExpression(r)) {
val oldLeft = newLeft
val oldRight = newRight
- if (canPruneLeft(joinType)) {
- extractBeneficialFilterCreatePlan(left, right, l, r,
hint).foreach {
+ // Check if the current join is a shuffle join or a broadcast join
that
+ // has a shuffle below it
+ val hasShuffle = isProbablyShuffleJoin(left, right, hint)
+ if (canPruneLeft(joinType) && (hasShuffle ||
probablyHasShuffle(left))) {
+ extractBeneficialFilterCreatePlan(left, right, l, r).foreach {
filterCreationSidePlan =>
newLeft = injectFilter(l, newLeft, r, filterCreationSidePlan)
}
}
// Did we actually inject on the left? If not, try on the right
- if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType)) {
- extractBeneficialFilterCreatePlan(right, left, r, l,
hint).foreach {
+ // Check if the current join is a shuffle join or a broadcast join
that
+ // has a shuffle below it
+ if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
+ (hasShuffle || probablyHasShuffle(right))) {
+ extractBeneficialFilterCreatePlan(right, left, r, l).foreach {
filterCreationSidePlan =>
newRight = injectFilter(r, newRight, l,
filterCreationSidePlan)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]