This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ed702c0db71a [SPARK-51812][SQL] Remove redundant parameters of some methods in `QueryExecution` ed702c0db71a is described below commit ed702c0db71a2d185e9d56567375616170a1d6af Author: panbingkun <panbing...@apache.org> AuthorDate: Wed Apr 16 19:56:57 2025 +0800 [SPARK-51812][SQL] Remove redundant parameters of some methods in `QueryExecution` ### What changes were proposed in this pull request? The pr aims to remove redundant parameters of some methods in `QueryExecution`, includes: - def createSparkPlan(<del>sparkSession: SparkSession</del>, planner: SparkPlanner, plan: LogicalPlan): SparkPlan - def prepareExecutedPlan(<del>sparkSession: SparkSession</del>, plan: LogicalPlan, context: AdaptiveExecutionContext): SparkPlan ### Why are the changes needed? - The parameters `sparkSession` of the method `QueryExecution#createSparkPlan` are not actually used in the method body - The parameters `sparkSession` of the method `QueryExecution#prepareExecutedPlan` is actually the attribute `session` of parameter `context` These have brought `misunderstanding` and `burden` on the developers. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50598 from panbingkun/SPARK-51812. Authored-by: panbingkun <panbing...@apache.org> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 11 +++++------ .../sql/execution/adaptive/InsertAdaptiveSparkPlan.scala | 2 +- .../adaptive/PlanAdaptiveDynamicPruningFilters.scala | 4 +--- .../execution/dynamicpruning/PlanDynamicPruningFilters.scala | 3 +-- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 87cafa58d5fa..a43c1cc0177d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -209,7 +209,7 @@ class QueryExecution( executePhase(QueryPlanningTracker.PLANNING) { // Clone the logical plan here, in case the planner rules change the states of the logical // plan. - QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone()) + QueryExecution.createSparkPlan(planner, optimizedPlan.clone()) } } @@ -574,7 +574,6 @@ object QueryExecution { * Note that the returned physical plan still needs to be prepared for execution. */ def createSparkPlan( - sparkSession: SparkSession, planner: SparkPlanner, plan: LogicalPlan): SparkPlan = { // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, @@ -594,7 +593,7 @@ object QueryExecution { * [[SparkPlan]] for execution. */ def prepareExecutedPlan(spark: SparkSession, plan: LogicalPlan): SparkPlan = { - val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone()) + val sparkPlan = createSparkPlan(spark.sessionState.planner, plan.clone()) prepareExecutedPlan(spark, sparkPlan) } @@ -603,11 +602,11 @@ object QueryExecution { * This method is only called by [[PlanAdaptiveDynamicPruningFilters]]. */ def prepareExecutedPlan( - session: SparkSession, plan: LogicalPlan, context: AdaptiveExecutionContext): SparkPlan = { - val sparkPlan = createSparkPlan(session, session.sessionState.planner, plan.clone()) - val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true) + val sparkPlan = createSparkPlan(context.session.sessionState.planner, plan.clone()) + val preparationRules = + preparations(context.session, Option(InsertAdaptiveSparkPlan(context)), true) prepareForExecution(preparationRules, sparkPlan.clone()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 73fc9b1fe4e2..2855f902a850 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -153,7 +153,7 @@ case class InsertAdaptiveSparkPlan( // Apply the same instance of this rule to sub-queries so that sub-queries all share the // same `stageCache` for Exchange reuse. this.applyInternal( - QueryExecution.createSparkPlan(adaptiveExecutionContext.session, + QueryExecution.createSparkPlan( adaptiveExecutionContext.session.sessionState.planner, plan.clone()), true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala index 77c180b18aee..751cfe5b7bb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala @@ -74,9 +74,7 @@ case class PlanAdaptiveDynamicPruningFilters( val aliases = indices.map(idx => Alias(buildKeys(idx), buildKeys(idx).toString)()) val aggregate = Aggregate(aliases, aliases, buildPlan) - val session = adaptivePlan.context.session - val sparkPlan = QueryExecution.prepareExecutedPlan( - session, aggregate, adaptivePlan.context) + val sparkPlan = QueryExecution.prepareExecutedPlan(aggregate, adaptivePlan.context) assert(sparkPlan.isInstanceOf[AdaptiveSparkPlanExec]) val newAdaptivePlan = sparkPlan.asInstanceOf[AdaptiveSparkPlanExec] val values = SubqueryExec(name, newAdaptivePlan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala index 5f5a9e188532..059729d86bfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala @@ -55,8 +55,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp plan.transformAllExpressionsWithPruning(_.containsPattern(DYNAMIC_PRUNING_SUBQUERY)) { case DynamicPruningSubquery( value, buildPlan, buildKeys, broadcastKeyIndices, onlyInBroadcast, exprId, _) => - val sparkPlan = QueryExecution.createSparkPlan( - sparkSession, sparkSession.sessionState.planner, buildPlan) + val sparkPlan = QueryExecution.createSparkPlan(sparkSession.sessionState.planner, buildPlan) // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is // the first to be applied (apart from `InsertAdaptiveSparkPlan`). val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty && --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org