This is an automated email from the ASF dual-hosted git repository.
ptoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8342d7438114 [SPARK-52974][SQL] Don't broadcast dynamicpruning
InSubqueries
8342d7438114 is described below
commit 8342d7438114b7f1bffed39a7ff52900a9014348
Author: Peter Toth <[email protected]>
AuthorDate: Mon Aug 4 18:35:01 2025 +0200
[SPARK-52974][SQL] Don't broadcast dynamicpruning InSubqueries
### What changes were proposed in this pull request?
Currently when AQE is off, DPP `InSubquery`s are planned by the
`PlanSubqueries` rule as regular `InSubqueryExec(isDynamicPruning = false)`
which causes the results to be broadcasted. But those subquery results are
always used at the driver only so there is no need to broadcast the result.
### Why are the changes needed?
Perf improvement when AQE is off + make `PlanDynamicPruningFilters` to be
on par with `PlanAdaptiveDynamicPruningFilters`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs with the revised test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51685 from peter-toth/SPARK-52974-fix-dpp-insubqueries-broadcast.
Authored-by: Peter Toth <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
---
.../execution/dynamicpruning/PlanDynamicPruningFilters.scala | 12 ++++++------
.../src/test/scala/org/apache/spark/sql/ExplainSuite.scala | 4 ++--
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
index 059729d86bfa..fbd341b6e7b8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
@@ -17,15 +17,14 @@
package org.apache.spark.sql.execution.dynamicpruning
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq,
BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression,
ListQuery, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq,
BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression,
Literal}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.DYNAMIC_PRUNING_SUBQUERY
import org.apache.spark.sql.classic.SparkSession
-import org.apache.spark.sql.execution.{InSubqueryExec, QueryExecution,
SparkPlan, SubqueryBroadcastExec}
+import org.apache.spark.sql.execution.{InSubqueryExec, QueryExecution,
SparkPlan, SubqueryBroadcastExec, SubqueryExec}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.internal.SQLConf
@@ -56,6 +55,7 @@ case class PlanDynamicPruningFilters(sparkSession:
SparkSession) extends Rule[Sp
case DynamicPruningSubquery(
value, buildPlan, buildKeys, broadcastKeyIndices, onlyInBroadcast,
exprId, _) =>
val sparkPlan =
QueryExecution.createSparkPlan(sparkSession.sessionState.planner, buildPlan)
+ val name = s"dynamicpruning#${exprId.id}"
// Using `sparkPlan` is a little hacky as it is based on the
assumption that this rule is
// the first to be applied (apart from `InsertAdaptiveSparkPlan`).
val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty
&&
@@ -72,7 +72,6 @@ case class PlanDynamicPruningFilters(sparkSession:
SparkSession) extends Rule[Sp
val mode = broadcastMode(buildKeys, executedPlan.output)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, executedPlan)
- val name = s"dynamicpruning#${exprId.id}"
// place the broadcast adaptor for reusing the broadcast results on
the probe side
val broadcastValues =
SubqueryBroadcastExec(name, broadcastKeyIndices, buildKeys,
exchange)
@@ -85,8 +84,9 @@ case class PlanDynamicPruningFilters(sparkSession:
SparkSession) extends Rule[Sp
val aliases = broadcastKeyIndices.map(idx =>
Alias(buildKeys(idx), buildKeys(idx).toString)())
val aggregate = Aggregate(aliases, aliases, buildPlan)
- DynamicPruningExpression(expressions.InSubquery(
- Seq(value), ListQuery(aggregate, numCols =
aggregate.output.length)))
+ val sparkPlan = QueryExecution.prepareExecutedPlan(sparkSession,
aggregate)
+ val values = SubqueryExec(name, sparkPlan)
+ DynamicPruningExpression(InSubqueryExec(value, values, exprId))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index 9c90e0105a42..b27122a8de2b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -310,10 +310,10 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
|""".stripMargin
val expected_pattern1 =
- "Subquery:1 Hosting operator id = 1 Hosting Expression = k#xL IN
subquery#x"
+ "Subquery:1 Hosting operator id = 1 Hosting Expression = k#xL IN
dynamicpruning#x"
val expected_pattern2 =
"PartitionFilters: \\[isnotnull\\(k#xL\\),
dynamicpruningexpression\\(k#xL " +
- "IN subquery#x\\)\\]"
+ "IN dynamicpruning#x\\)\\]"
val expected_pattern3 =
"Location: InMemoryFileIndex
\\[\\S*org.apache.spark.sql.ExplainSuite" +
"/df2/\\S*, ... 99 entries\\]"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]