This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 8145c013cc51 [SPARK-56032][SQL][FOLLOWUP] Add conf to gate
subexpression elimination in FilterExec codegen
8145c013cc51 is described below
commit 8145c013cc5186a49f264fff486e82a9cd063f64
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Jun 3 01:27:29 2026 +0800
[SPARK-56032][SQL][FOLLOWUP] Add conf to gate subexpression elimination in
FilterExec codegen
### What changes were proposed in this pull request?
Followup to https://github.com/apache/spark/pull/56209.
This adds an internal conf
`spark.sql.subexpressionElimination.filterExec.enabled` (default `true`) that
gates subexpression elimination (CSE) in `FilterExec` whole-stage codegen
specifically. When set to `false`, `FilterExec` falls back to the existing
predicate codegen path (`generatePredicateCode`) that loads input columns
lazily and short-circuits; subexpression elimination elsewhere (e.g.
`ProjectExec`) is unaffected. The conf is checked alongside the existing
`spark.sql.subexpre [...]
### Why are the changes needed?
CSE in `FilterExec` codegen (SPARK-56032) can introduce a performance
regression for some filters. To materialize eliminated subexpressions into
shared variables, the CSE path emits an eager prologue (`inputVarsEvalCode`)
that evaluates every input column referenced by the predicates at the top of
the per-row loop. The non-CSE path instead loads each column lazily, right
before the predicate that needs it, so a cheap, highly selective leading
predicate can short-circuit and skip decod [...]
The eager prologue defeats that short-circuiting. When a filter has
expensive-to-decode columns behind a cheaper, selective predicate -- e.g.
high-precision decimals, which allocate a `BigInteger`/`BigDecimal` per decode
-- eagerly decoding those columns for every row, including rows the cheap
predicate would reject, is pure waste. This showed up as a measurable
regression on TPC-DS q28.
https://github.com/apache/spark/pull/56209 already removes the prologue
when there is no common subexpression at all. But even when a common
subexpression exists, the eager prologue can still regress if the savings from
eliminating it don't outweigh the lost short-circuiting. The global
`spark.sql.subexpressionElimination.enabled` flag is too coarse to address this
-- turning it off also disables CSE for projections and other operators. This
conf provides a targeted kill-switch to fal [...]
### Does this PR introduce _any_ user-facing change?
No. The conf is internal and defaults to `true`, preserving the current
behavior.
### How was this patch tested?
New unit test in `WholeStageCodegenSuite`: with a genuine common
subexpression in the filter predicates, flipping
`spark.sql.subexpressionElimination.filterExec.enabled` off (while leaving
global subexpression elimination on) makes `FilterExec` fall back to the lazy
non-CSE path -- the shared subexpression is re-evaluated per use, matching the
code generated when CSE is globally disabled. Existing SPARK-56032 FilterExec
CSE tests continue to exercise the default-on path.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude (Claude Code)
Closes #56271 from cloud-fan/SPARK-56032-filterExec-cse-conf.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 6d61d4699848a1846497843a4455e361aa79101b)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++++
.../sql/execution/basicPhysicalOperators.scala | 6 ++-
.../sql/execution/WholeStageCodegenSuite.scala | 47 ++++++++++++++++++++++
3 files changed, 68 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7fadd084fe72..a21653a011b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1362,6 +1362,19 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val SUBEXPRESSION_ELIMINATION_FILTER_EXEC_ENABLED =
+ buildConf("spark.sql.subexpressionElimination.filterExec.enabled")
+ .internal()
+ .doc("When true (and subexpression elimination is enabled), FilterExec
whole-stage " +
+ "codegen eliminates common subexpressions shared across its
predicates. When false, " +
+ "FilterExec falls back to the predicate codegen that loads input
columns lazily and " +
+ "short-circuits, avoiding eager materialization of all
predicate-referenced columns on " +
+ "every row. Only affects FilterExec; subexpression elimination
elsewhere is unaffected.")
+ .version("4.2.0")
+ .withBindingPolicy(ConfigBindingPolicy.SESSION)
+ .booleanConf
+ .createWithDefault(true)
+
val CASE_SENSITIVE = buildConf(SqlApiConfHelper.CASE_SENSITIVE_KEY)
.internal()
.doc("Whether the query analyzer should be case sensitive or not. " +
@@ -8052,6 +8065,9 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def subexpressionEliminationSkipForShotcutExpr: Boolean =
getConf(SUBEXPRESSION_ELIMINATION_SKIP_FOR_SHORTCUT_EXPR)
+ def subexpressionEliminationFilterExecEnabled: Boolean =
+ getConf(SUBEXPRESSION_ELIMINATION_FILTER_EXEC_ENABLED)
+
def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
def limitInitialNumPartitions: Int = getConf(LIMIT_INITIAL_NUM_PARTITIONS)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 8d183f915e8a..9ed9c312a4be 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -315,8 +315,12 @@ case class FilterExec(condition: Expression, child:
SparkPlan)
// predicate that needs them. With no common subexpression the prologue is
pure overhead
// (e.g. decoding a decimal column for rows a cheaper earlier predicate
would reject), so we
// fall back to `generatePredicateCode`.
+ //
+ // `subexpressionElimination.filterExec.enabled` additionally gates this
path so it can be
+ // turned off independently of subexpression elimination elsewhere.
val (prologueCode, predicateCode) =
- if (conf.subexpressionEliminationEnabled && otherPreds.nonEmpty &&
+ if (conf.subexpressionEliminationEnabled &&
conf.subexpressionEliminationFilterExecEnabled &&
+ otherPreds.nonEmpty &&
otherPredsEquivalentExpressions.getCommonSubexpressions.nonEmpty) {
// Pre-evaluate input variables before CSE analysis: CSE clears
// ctx.currentVars[i].code as a side effect; without this
pre-evaluation, Janino
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 8f0ec0ffd6f1..a83d5c99bb5d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -1224,4 +1224,51 @@ class WholeStageCodegenSuite extends SharedSparkSession
"With no common subexpression, CSE-enabled FilterExec codegen should be
identical to " +
"CSE-disabled codegen (i.e. fall back to the lazy, short-circuiting
non-CSE path)")
}
+
+ test("SPARK-56032: subexpressionElimination.filterExec.enabled gates
FilterExec CSE " +
+ "independently of subexpression elimination") {
+ // The conf disables CSE specifically for FilterExec while leaving
subexpression elimination
+ // enabled elsewhere. With a genuine common subexpression in the
predicates, turning the conf
+ // off should make FilterExec fall back to the lazy non-CSE path
(re-evaluating the shared
+ // subexpression per use), matching the code generated when CSE is
globally disabled.
+ val schema = StructType(Seq(
+ StructField("a", DayTimeIntervalType(), nullable = true),
+ StructField("b", DayTimeIntervalType(), nullable = true)))
+ val data = spark.sparkContext.parallelize(Seq(
+ Row(Duration.ofDays(1), Duration.ofDays(5)),
+ Row(Duration.ofDays(5), Duration.ofDays(6)),
+ Row(Duration.ofDays(2), Duration.ofDays(3))))
+ val expected = data.collect().toSeq
+
+ // `a + b` appears three times in the predicate, so it is a CSE candidate.
We count `addExact`
+ // occurrences in the generated code: the CSE path evaluates it once, the
lazy path per use.
+ def filterCode(filterExecCseEnabled: Boolean): String = {
+ withSQLConf(
+ // Subexpression elimination stays globally on; only the FilterExec
gate flips.
+ SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key -> "true",
+ SQLConf.SUBEXPRESSION_ELIMINATION_FILTER_EXEC_ENABLED.key ->
+ filterExecCseEnabled.toString,
+ SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ val df = spark.createDataFrame(data, schema)
+ val filtered = df.where(
+ "a IS NOT NULL AND (a + b) > INTERVAL '3' DAY " +
+ "AND (a + b) < INTERVAL '15' DAY AND (a + b) != INTERVAL '10' DAY")
+ val plan = filtered.queryExecution.executedPlan
+ assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]),
+ "Filter should be in whole-stage codegen")
+ checkAnswer(filtered, expected)
+ codegenString(plan)
+ }
+ }
+
+ val addExactPattern = "addExact".r
+ val enabledCount =
addExactPattern.findAllIn(filterCode(filterExecCseEnabled = true)).length
+ val disabledCount =
addExactPattern.findAllIn(filterCode(filterExecCseEnabled = false)).length
+ // With the gate on, CSE collapses the repeated `a + b` evaluations; with
the gate off,
+ // FilterExec falls back to the lazy path that re-evaluates per use.
+ assert(enabledCount < disabledCount,
+ s"subexpressionElimination.filterExec.enabled should reduce repeated
evaluation: " +
+ s"addExact appears $enabledCount times when enabled vs $disabledCount
times when disabled")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]