This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7796d8a6331 [SPARK-45531][SQL][DOCS] Add more comments and rename some
variable name for InjectRuntimeFilter
7796d8a6331 is described below
commit 7796d8a63318d560b08d4d6a8b4d68ea0112bd3e
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Oct 16 15:40:17 2023 +0800
[SPARK-45531][SQL][DOCS] Add more comments and rename some variable name
for InjectRuntimeFilter
### What changes were proposed in this pull request?
After many improvements, `InjectRuntimeFilter` is a bit complex. We need
add more comments to give more design details and rename some variable name so
that the `InjectRuntimeFilter` have better readability and maintainability.
The core of a runtime filter is join keys, but the suffix `Exp` is not
intuitive, so it's better to use the suffix `Key` directly. So rename as
follows:
`filterApplicationSideExp` -> `filterApplicationSideKey`
`filterCreationSideExp` -> `filterCreationSideKey`
`findBloomFilterWithExp` -> `findBloomFilterWithKey`
`expr` -> `joinKey`
### Why are the changes needed?
Improve the readability and maintainability.
### Does this PR introduce _any_ user-facing change?
'No'.
### How was this patch tested?
N/A
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #43359 from beliefer/SPARK-45531.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalyst/optimizer/InjectRuntimeFilter.scala | 76 ++++++++++++----------
1 file changed, 40 insertions(+), 36 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 614ab4a1d01..8737082e571 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -29,48 +29,50 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
- * Insert a filter on one side of the join if the other side has a selective
predicate.
- * The filter could be an IN subquery (converted to a semi join), a bloom
filter, or something
- * else in the future.
+ * Insert a runtime filter on one side of the join (we call this side the
application side) if
+ * we can extract a runtime filter from the other side (creation side). A
simple case is that
+ * the creation side is a table scan with a selective filter.
+ * The runtime filter is logically an IN subquery with the join keys
(converted to a semi join),
+ * but can be something different physically, such as a bloom filter.
*/
object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with
JoinSelectionHelper {
- // Wraps `expr` with a hash function if its byte size is larger than an
integer.
- private def mayWrapWithHash(expr: Expression): Expression = {
- if (expr.dataType.defaultSize > IntegerType.defaultSize) {
- new Murmur3Hash(Seq(expr))
+ // Wraps `joinKey` with a hash function if its byte size is larger than an
integer.
+ private def mayWrapWithHash(joinKey: Expression): Expression = {
+ if (joinKey.dataType.defaultSize > IntegerType.defaultSize) {
+ new Murmur3Hash(Seq(joinKey))
} else {
- expr
+ joinKey
}
}
private def injectFilter(
- filterApplicationSideExp: Expression,
+ filterApplicationSideKey: Expression,
filterApplicationSidePlan: LogicalPlan,
- filterCreationSideExp: Expression,
+ filterCreationSideKey: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan = {
require(conf.runtimeFilterBloomFilterEnabled ||
conf.runtimeFilterSemiJoinReductionEnabled)
if (conf.runtimeFilterBloomFilterEnabled) {
injectBloomFilter(
- filterApplicationSideExp,
+ filterApplicationSideKey,
filterApplicationSidePlan,
- filterCreationSideExp,
+ filterCreationSideKey,
filterCreationSidePlan
)
} else {
injectInSubqueryFilter(
- filterApplicationSideExp,
+ filterApplicationSideKey,
filterApplicationSidePlan,
- filterCreationSideExp,
+ filterCreationSideKey,
filterCreationSidePlan
)
}
}
private def injectBloomFilter(
- filterApplicationSideExp: Expression,
+ filterApplicationSideKey: Expression,
filterApplicationSidePlan: LogicalPlan,
- filterCreationSideExp: Expression,
+ filterCreationSideKey: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan = {
// Skip if the filter creation side is too big
if (filterCreationSidePlan.stats.sizeInBytes >
conf.runtimeFilterCreationSideThreshold) {
@@ -79,9 +81,9 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
val rowCount = filterCreationSidePlan.stats.rowCount
val bloomFilterAgg =
if (rowCount.isDefined && rowCount.get.longValue > 0L) {
- new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)),
rowCount.get.longValue)
+ new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideKey)),
rowCount.get.longValue)
} else {
- new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)))
+ new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideKey)))
}
val alias = Alias(bloomFilterAgg.toAggregateExpression(), "bloomFilter")()
@@ -89,26 +91,26 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias),
filterCreationSidePlan)))
val bloomFilterSubquery = ScalarSubquery(aggregate, Nil)
val filter = BloomFilterMightContain(bloomFilterSubquery,
- new XxHash64(Seq(filterApplicationSideExp)))
+ new XxHash64(Seq(filterApplicationSideKey)))
Filter(filter, filterApplicationSidePlan)
}
private def injectInSubqueryFilter(
- filterApplicationSideExp: Expression,
+ filterApplicationSideKey: Expression,
filterApplicationSidePlan: LogicalPlan,
- filterCreationSideExp: Expression,
+ filterCreationSideKey: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan = {
- require(filterApplicationSideExp.dataType ==
filterCreationSideExp.dataType)
- val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
+ require(filterApplicationSideKey.dataType ==
filterCreationSideKey.dataType)
+ val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideKey)
val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
val aggregate =
- ColumnPruning(Aggregate(Seq(filterCreationSideExp), Seq(alias),
filterCreationSidePlan))
+ ColumnPruning(Aggregate(Seq(filterCreationSideKey), Seq(alias),
filterCreationSidePlan))
if (!canBroadcastBySize(aggregate, conf)) {
// Skip the InSubquery filter if the size of `aggregate` is beyond
broadcast join threshold,
// i.e., the semi-join will be a shuffled join, which is not worthwhile.
return filterApplicationSidePlan
}
- val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),
+ val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideKey)),
ListQuery(aggregate, numCols = aggregate.output.length))
Filter(filter, filterApplicationSidePlan)
}
@@ -117,11 +119,13 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
* Extracts a sub-plan which is a simple filter over scan from the input
plan. The simple
* filter should be selective and the filter condition (including
expressions in the child
* plan referenced by the filter condition) should be a simple expression,
so that we do
- * not add a subquery that might have an expensive computation.
+ * not add a subquery that might have an expensive computation. The
extracted sub-plan should
+ * produce a superset of the entire creation side output data, so that it's
still correct to
+ * use the sub-plan to build the runtime filter to prune the application
side.
*/
private def extractSelectiveFilterOverScan(
plan: LogicalPlan,
- filterCreationSideExp: Expression): Option[LogicalPlan] = {
+ filterCreationSideKey: Expression): Option[LogicalPlan] = {
@tailrec
def extract(
p: LogicalPlan,
@@ -157,10 +161,10 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
// Runtime filters use one side of the [[Join]] to build a set of join
key values and prune
// the other side of the [[Join]]. It's also OK to use a superset of
the join key values
// (ignore null values) to do the pruning.
- if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
+ if (left.output.exists(_.semanticEquals(filterCreationSideKey))) {
extract(left, AttributeSet.empty,
hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan =
left)
- } else if
(right.output.exists(_.semanticEquals(filterCreationSideExp))) {
+ } else if
(right.output.exists(_.semanticEquals(filterCreationSideKey))) {
extract(right, AttributeSet.empty,
hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan =
right)
} else {
@@ -226,7 +230,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
/**
* Extracts the beneficial filter creation plan with check show below:
- * - The filterApplicationSideJoinExp can be pushed down through joins,
aggregates and windows
+ * - The filterApplicationSideKey can be pushed down through joins,
aggregates and windows
* (ie the expression references originate from a single leaf node)
* - The filter creation side has a selective predicate
* - The max filterApplicationSide scan size is greater than a configurable
threshold
@@ -234,12 +238,12 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
private def extractBeneficialFilterCreatePlan(
filterApplicationSide: LogicalPlan,
filterCreationSide: LogicalPlan,
- filterApplicationSideExp: Expression,
- filterCreationSideExp: Expression): Option[LogicalPlan] = {
+ filterApplicationSideKey: Expression,
+ filterCreationSideKey: Expression): Option[LogicalPlan] = {
if (findExpressionAndTrackLineageDown(
- filterApplicationSideExp, filterApplicationSide).isDefined &&
+ filterApplicationSideKey, filterApplicationSide).isDefined &&
satisfyByteSizeRequirement(filterApplicationSide)) {
- extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideExp)
+ extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideKey)
} else {
None
}
@@ -276,10 +280,10 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
right: LogicalPlan,
leftKey: Expression,
rightKey: Expression): Boolean = {
- findBloomFilterWithExp(left, leftKey) || findBloomFilterWithExp(right,
rightKey)
+ findBloomFilterWithKey(left, leftKey) || findBloomFilterWithKey(right,
rightKey)
}
- private def findBloomFilterWithExp(plan: LogicalPlan, key: Expression):
Boolean = {
+ private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression):
Boolean = {
plan.exists {
case Filter(condition, _) =>
splitConjunctivePredicates(condition).exists {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]