This is an automated email from the ASF dual-hosted git repository.
holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 835689e0c343 [SPARK-47672][SQL] Avoid double eval from filter pushDown
w/ projection pushdown
835689e0c343 is described below
commit 835689e0c343dc13c715c1b25aa5e16c70e3aed7
Author: Holden Karau <[email protected]>
AuthorDate: Fri Feb 27 14:13:07 2026 -0800
[SPARK-47672][SQL] Avoid double eval from filter pushDown w/ projection
pushdown
### What changes were proposed in this pull request?
Changes the filter pushDown optimizer to not push down past projections of
the same element if we reasonable expect that computing that element is likely
to be expensive.
This is a slightly complex alternative to
https://github.com/apache/spark/pull/45802 which also moves parts of
projections down so that the filters can move further down.
An expression can indicate if it is too expensive to be worth the potential
savings of being double evaluated as a result of pushdown (by default we do
this for all UDFs).
### Future Work / What else remains to do?
Right now if a cond is expensive and it references something in the
projection we don't push-down. We could probably do better and gate this on if
the thing we are reference is expensive rather than the condition it's self. We
could do this as a follow up item or as part of this PR.
### Why are the changes needed?
Currently Spark may double compute expensive operations (like json parsing,
UDF eval, etc.) as a result of filter pushdown past projections.
### Does this PR introduce _any_ user-facing change?
SQL optimizer change may impact some user queries, results should be the
same and hopefully a little faster.
### How was this patch tested?
New tests were added to the FilterPushDownSuite, and the initial problem of
double evaluation was confirmed with a github gist
### Was this patch authored or co-authored using generative AI tooling?
Used claude to generate more test coverage.
Closes #46143 from
holdenk/SPARK-47672-avoid-double-eval-from-filter-pushdown-split-projection.
Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Co-authored-by: Claude <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
---
.../sql/catalyst/expressions/AliasHelper.scala | 26 +++++
.../expressions/CallMethodViaReflection.scala | 3 +
.../sql/catalyst/expressions/Expression.scala | 9 ++
.../spark/sql/catalyst/expressions/ScalaUDF.scala | 2 +
.../expressions/collectionOperations.scala | 2 +
.../catalyst/expressions/regexpExpressions.scala | 25 +++++
.../spark/sql/catalyst/optimizer/Optimizer.scala | 75 +++++++++++++-
.../org/apache/spark/sql/internal/SQLConf.scala | 10 ++
.../catalyst/optimizer/FilterPushdownSuite.scala | 109 ++++++++++++++++++++-
9 files changed, 256 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
index 34393aaca7c6..2340385dcdd6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
@@ -65,6 +65,32 @@ trait AliasHelper {
})
}
+ /**
+ * Replace all attributes, that reference an alias, with the aliased
expression.
+ * Tracks which aliases were replaced and returns them.
+ */
+ protected def replaceAliasWhileTracking(
+ expr: Expression,
+ aliasMap: AttributeMap[Alias]): (Expression, AttributeMap[Alias]) = {
+ // Use transformUp to prevent infinite recursion when the replacement
expression
+ // redefines the same ExprId,
+ var replaced = AttributeMap.empty[Alias]
+ val newExpr = trimAliases(expr.transformUp {
+ case a: Attribute =>
+ // If we replace an alias add it to replaced
+ val newElem = aliasMap.get(a)
+ newElem match {
+ case None => a
+ case Some(b) =>
+ if (!replaced.contains(a)) {
+ replaced += (a, b)
+ }
+ b
+ }
+ })
+ (newExpr, replaced)
+ }
+
/**
* Replace all attributes, that reference an alias, with the aliased
expression,
* but keep the name of the outermost attribute.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
index cf34ceefdfee..94d9b305d3b0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
@@ -67,6 +67,9 @@ case class CallMethodViaReflection(
with CodegenFallback
with QueryErrorsBase {
+ // This could be pretty much anything.
+ override def expensive: Boolean = true
+
def this(children: Seq[Expression]) =
this(children, true)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index b61f7ee0ee16..834f3b0debd0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -404,6 +404,15 @@ abstract class Expression extends TreeNode[Expression] {
} else {
""
}
+
+ /**
+ * Mark if an expression is likely to be expensive.
+ * The current only consumer of this is the pushdown optimizer.
+ * By default an expression is expensive if any of it's children are
expensive.
+ */
+ def expensive: Boolean = hasExpensiveChild
+
+ protected lazy val hasExpensiveChild: Boolean = children.exists(_.expensive)
}
object ExpressionPatternBitMask {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index bba3d4b1a806..b4dd41092871 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -1210,4 +1210,6 @@ case class ScalaUDF(
override protected def withNewChildrenInternal(newChildren:
IndexedSeq[Expression]): ScalaUDF =
copy(children = newChildren)
+
+ override def expensive: Boolean = true
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 6dddd9e6646c..dc3e6dcbd388 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -2293,6 +2293,8 @@ case class ArrayJoin(
override def dataType: DataType =
array.dataType.asInstanceOf[ArrayType].elementType
override def prettyName: String = "array_join"
+
+ override def expensive: Boolean = true
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
index c6e5c480f3c2..5ad360a54e8d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
@@ -83,6 +83,31 @@ abstract class StringRegexExpression extends BinaryExpression
matches(regex, input1.asInstanceOf[UTF8String].toString)
}
}
+
+ override def expensive: Boolean = hasExpensiveChild || _expensiveRegex
+
+ // Heuristic, not designed to be perfect. Look for things likely to have
+ // back tracking.
+ private val detectExpensiveRegexPattern = Pattern.compile("\\+\\*\\{")
+
+ private lazy val _expensiveRegex = {
+ // A quick heuristic for expensive a pattern is.
+ left match {
+ case StringLiteral(str) =>
+ // If we have a clear start limited back tracking required.
+ if (str.startsWith("^") || str.startsWith("\\b")) {
+ false
+ } else if (detectExpensiveRegexPattern.matcher(str).matches()) {
+ // Greedy matching can be tricky.
+ true
+ } else {
+ // Default to pushdown for now.
+ false
+ }
+ case _ =>
+ true // per row regex compilation.
+ }
+ }
}
private[catalyst] object StringRegexExpression {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 712d02ac8eca..c953258cff2f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -2059,23 +2059,92 @@ object PushDownPredicates extends Rule[LogicalPlan] {
* Pushes [[Filter]] operators through many operators iff:
* 1) the operator is deterministic
* 2) the predicate is deterministic and the operator will not change any of
rows.
+ * 3) We don't add double evaluation OR double evaluation would be cheap OR
we're configured to.
*
- * This heuristic is valid assuming the expression evaluation cost is minimal.
*/
object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with
PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
+ // Projections are a special case because the filter _may_ contain
references to fields added in
+ // the projection that we wish to copy. We shouldn't blindly copy
everything
+ // since double evaluation all operations can be expensive (unless the
broken behavior is
+ // enabled by the user). The double filter eval regression was added in
Spark 3 fixed in 4.2.
+ // The _new_ default algorithm works as follows:
+ // Provided filters are broken up based on their &&s for separate
evaluation.
+ // We track which components of the projection are used in the filters.
+ //
+ // 1) The filter does not reference anything in the projection: pushed
+ // 2) Filter which reference _inexpensive_ items in projection: pushed and
reference resolved
+ // resulting in double evaluation, but only of inexpensive items --
worth it to filter
+ // records sooner.
+ // (Case 1 & 2 are treated as "cheap" predicates)
+ // 3) When an a filter references expensive to compute references we do
not push it.
+ // Note that a given filter may contain parts (sepereated by logical ands)
from all cases.
+ // We handle each part separately according to the logic above.
+ // Additional restriction:
// SPARK-13473: We can't push the predicate down when the underlying
projection output non-
// deterministic field(s). Non-deterministic expressions are essentially
stateful. This
// implies that, for a given input row, the output are determined by the
expression's initial
// state and all the input rows processed before. In another word, the
order of input rows
// matters for non-deterministic expressions, while pushing down
predicates changes the order.
// This also applies to Aggregate.
- case Filter(condition, project @ Project(fields, grandChild))
+ case f @ Filter(condition, project @ Project(fields, grandChild))
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild,
condition) =>
+ // All of the aliases in the projection
val aliasMap = getAliasMap(project)
- project.copy(child = Filter(replaceAlias(condition, aliasMap),
grandChild))
+ if (!SQLConf.get.avoidDoubleFilterEval) {
+ // If the user is ok with double evaluation of projections short
circuit
+ project.copy(child = Filter(replaceAlias(condition, aliasMap),
grandChild))
+ } else {
+ // Break up the filter into its respective components by &&s.
+ val splitCondition = splitConjunctivePredicates(condition)
+ // Find the different aliases each component of the filter uses.
+ val originalAndRewrittenConditionsWithUsedAlias = splitCondition.map {
cond =>
+ // Here we get which aliases were used in a given filter so we can
see if the filter
+ // referenced an expensive alias v.s. just checking if the filter is
expensive.
+ val (replaced, usedAliases) = replaceAliasWhileTracking(cond,
aliasMap)
+ (cond, usedAliases, replaced)
+ }
+ // Split the filter's components into cheap and expensive while
keeping track of
+ // what each references from the projection.
+ val (cheapWithUsed, expensiveWithUsed) =
originalAndRewrittenConditionsWithUsedAlias
+ .partition { case (cond, used, replaced) =>
+ // Didn't use anything? We're good
+ if (used.isEmpty) {
+ true
+ } else if (!used.exists(_._2.child.expensive)) {
+ // If it's cheap we can push it because it might eliminate more
data quickly and
+ // it may also be something which could be evaluated at the
storage layer.
+ // We may wish to improve this heuristic in the future.
+ true
+ } else {
+ false
+ }
+ }
+ // Short circuit if we do not have any cheap filters return the
original filter as is.
+ if (cheapWithUsed.isEmpty) {
+ f
+ } else {
+ val cheap: Seq[Expression] = cheapWithUsed.map(_._3)
+ // Make a base instance which has all of the cheap filters pushed
down.
+ // For all filter which do not reference any expensive aliases then
+ // just push the filter while resolving the non-expensive aliases.
+ val combinedCheapFilter = cheap.reduce(And)
+ val baseChild = Filter(combinedCheapFilter, child = grandChild)
+ // Take our projection and place it on top of the pushed filters.
+ val topProjection = project.copy(child = baseChild)
+
+ // If we pushed all the filters we can return the projection
+ if (expensiveWithUsed.isEmpty) {
+ topProjection
+ } else {
+ // Finally add any filters which could not be pushed
+ val remainingConditions = expensiveWithUsed.map(_._1)
+ Filter(remainingConditions.reduce(And), topProjection)
+ }
+ }
+ }
// We can push down deterministic predicate through Aggregate, including
throwable predicate.
// If we can push down a filter through Aggregate, it means the filter
only references the
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1874ff195516..1d59c7a08786 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -482,6 +482,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val AVOID_DOUBLE_FILTER_EVAL =
+ buildConf("spark.sql.optimizer.avoidDoubleFilterEval")
+ .doc("When true avoid pushing expensive (UDF, etc.) filters down if it
could result in" +
+ "double evaluation. This was the behaviour prior to 3.X.")
+ .version("4.2.0")
+ .booleanConf
+ .createWithDefault(true)
+
val OPTIMIZER_EXCLUDED_RULES = buildConf("spark.sql.optimizer.excludedRules")
.doc("Configures a list of rules to be disabled in the optimizer, in which
the rules are " +
"specified by their rule names and separated by comma. It is not
guaranteed that all the " +
@@ -8015,6 +8023,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def preserveCharVarcharTypeInfo: Boolean =
getConf(SQLConf.PRESERVE_CHAR_VARCHAR_TYPE_INFO)
+ def avoidDoubleFilterEval: Boolean = getConf(AVOID_DOUBLE_FILTER_EVAL)
+
def readSideCharPadding: Boolean = getConf(SQLConf.READ_SIDE_CHAR_PADDING)
def cliPrintHeader: Boolean = getConf(SQLConf.CLI_PRINT_HEADER)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 9424ecda0ed8..f457eced4d98 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
@@ -52,11 +53,14 @@ class FilterPushdownSuite extends PlanTest {
val attrB = $"b".int
val attrC = $"c".int
val attrD = $"d".int
+ val attrE = $"e".string
val testRelation = LocalRelation(attrA, attrB, attrC)
val testRelation1 = LocalRelation(attrD)
+ val testStringRelation = LocalRelation(attrA, attrB, attrE)
+
val simpleDisjunctivePredicate =
("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr >
11)
val expectedPredicatePushDownResult = {
@@ -152,7 +156,7 @@ class FilterPushdownSuite extends PlanTest {
test("can't push without rewrite") {
val originalQuery =
testRelation
- .select($"a" + $"b" as "e")
+ .select($"a" + $"b" as "e", $"a" - $"b" as "f")
.where($"e" === 1)
.analyze
@@ -160,9 +164,110 @@ class FilterPushdownSuite extends PlanTest {
val correctAnswer =
testRelation
.where($"a" + $"b" === 1)
- .select($"a" + $"b" as "e")
+ .select($"a" + $"b" as "e", $"a" - $"b" as "f")
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("SPARK-47672: Do double evaluation when configured") {
+ withSQLConf(SQLConf.AVOID_DOUBLE_FILTER_EVAL.key -> "false") {
+ val originalQuery = testStringRelation
+ .select($"a", $"e".rlike("magic") as "f", $"e".rlike("notmagic") as
"j", $"b")
+ .where($"a" > 5 && $"f")
+ .analyze
+
+ val optimized = Optimize.execute(originalQuery)
+
+ val correctAnswer = testStringRelation
+ .where($"a" > 5 && $"e".rlike("magic"))
+ .select($"a", $"e".rlike("magic") as "f", $"e".rlike("notmagic") as
"j", $"b")
.analyze
+ comparePlans(optimized, correctAnswer)
+ }
+ }
+
+ test("SPARK-47672: Make sure that we handle the case where everything is
expensive") {
+ val originalQuery = testStringRelation
+ .select($"e".rlike("magic") as "f")
+ .where($"f")
+ .analyze
+
+ val optimized = Optimize.execute(originalQuery)
+
+ // Nothing changes when everything is expensive.
+ val correctAnswer = originalQuery
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ // Case 1: Multiple filters that don't reference any projection aliases -
all should be pushed
+ test("SPARK-47672: Case 1 - multiple filters not referencing projection
aliases") {
+ val originalQuery = testStringRelation
+ .select($"a" as "c", $"e".rlike("magic") as "f", $"b" as "d", $"a", $"b")
+ .where($"a" > 5 && $"b" < 10)
+ .analyze
+
+ val optimized = Optimize.execute(originalQuery)
+
+ // Both filters on c and d should be pushed down since they just reference
+ // simple aliases (c->a, d->b) which are inexpensive
+ val correctAnswer = testStringRelation
+ .where($"a" > 5 && $"b" < 10)
+ .select($"a" as "c", $"e".rlike("magic") as "f", $"b" as "d", $"a", $"b")
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ // Case 2: Multiple filters with inexpensive references - all should be
pushed
+ test("SPARK-47672: Case 2 - multiple filters with inexpensive alias
references") {
+ val originalQuery = testStringRelation
+ .select($"a" + $"b" as "sum", $"a" - $"b" as "diff", $"e".rlike("magic")
as "f")
+ .where($"sum" > 10 && $"diff" < 5)
+ .analyze
+
+ val optimized = Optimize.execute(originalQuery)
+
+ // Both sum and diff are inexpensive (arithmetic), so both filters should
be pushed
+ val correctAnswer = testStringRelation
+ .where($"a" + $"b" > 10 && $"a" - $"b" < 5)
+ .select($"a" + $"b" as "sum", $"a" - $"b" as "diff", $"e".rlike("magic")
as "f")
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ // Case 3: Filter references expensive to compute references.
+ test("SPARK-47672: Avoid double evaluation with projections can't push past
certain items") {
+ val originalQuery = testStringRelation
+ .select($"a", $"e".rlike("magic") as "f")
+ .where($"a" > 5 || $"f")
+ .analyze
+
+ val optimized = Optimize.execute(originalQuery)
+
+ comparePlans(optimized, originalQuery)
+ }
+
+ // Combined case 1, 2, and 3 filter pushdown
+ test("SPARK-47672: Case 1, 2, and 3 make sure we leave up and push down
correctly.") {
+ val originalQuery = testStringRelation
+ .select($"a" + $"b" as "sum", $"a" - $"b" as "diff", $"e".rlike("magic")
as "f")
+ .where($"sum" > 10 && $"diff" < 5 && $"f")
+ .analyze
+
+ val optimized = Optimize.execute(originalQuery)
+
+ // Both sum and diff are inexpensive (arithmetic), so both pushed
+ // rlike magic is expensive so not pushed.
+ val correctAnswer = testStringRelation
+ .where($"a" + $"b" > 10 && $"a" - $"b" < 5)
+ .select($"a" + $"b" as "sum", $"a" - $"b" as "diff", $"e".rlike("magic")
as "f")
+ .where($"f")
+ .analyze
+
comparePlans(optimized, correctAnswer)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]