This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ed702c0db71a [SPARK-51812][SQL] Remove redundant parameters of some 
methods in `QueryExecution`
ed702c0db71a is described below

commit ed702c0db71a2d185e9d56567375616170a1d6af
Author: panbingkun <panbing...@apache.org>
AuthorDate: Wed Apr 16 19:56:57 2025 +0800

    [SPARK-51812][SQL] Remove redundant parameters of some methods in 
`QueryExecution`
    
    ### What changes were proposed in this pull request?
    The pr aims to remove redundant parameters of some methods in 
`QueryExecution`, includes:
    - def createSparkPlan(<del>sparkSession: SparkSession</del>, planner: 
SparkPlanner, plan: LogicalPlan): SparkPlan
    - def prepareExecutedPlan(<del>sparkSession: SparkSession</del>, plan: 
LogicalPlan, context: AdaptiveExecutionContext): SparkPlan
    
    ### Why are the changes needed?
    - The parameters `sparkSession` of the method 
`QueryExecution#createSparkPlan` are not actually used in the method body
    - The parameters `sparkSession` of the method 
`QueryExecution#prepareExecutedPlan` is actually the attribute `session` of 
parameter `context`
    These have brought `misunderstanding` and `burden` on the developers.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #50598 from panbingkun/SPARK-51812.
    
    Authored-by: panbingkun <panbing...@apache.org>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 11 +++++------
 .../sql/execution/adaptive/InsertAdaptiveSparkPlan.scala      |  2 +-
 .../adaptive/PlanAdaptiveDynamicPruningFilters.scala          |  4 +---
 .../execution/dynamicpruning/PlanDynamicPruningFilters.scala  |  3 +--
 4 files changed, 8 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 87cafa58d5fa..a43c1cc0177d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -209,7 +209,7 @@ class QueryExecution(
     executePhase(QueryPlanningTracker.PLANNING) {
       // Clone the logical plan here, in case the planner rules change the 
states of the logical
       // plan.
-      QueryExecution.createSparkPlan(sparkSession, planner, 
optimizedPlan.clone())
+      QueryExecution.createSparkPlan(planner, optimizedPlan.clone())
     }
   }
 
@@ -574,7 +574,6 @@ object QueryExecution {
    * Note that the returned physical plan still needs to be prepared for 
execution.
    */
   def createSparkPlan(
-      sparkSession: SparkSession,
       planner: SparkPlanner,
       plan: LogicalPlan): SparkPlan = {
     // TODO: We use next(), i.e. take the first plan returned by the planner, 
here for now,
@@ -594,7 +593,7 @@ object QueryExecution {
    * [[SparkPlan]] for execution.
    */
   def prepareExecutedPlan(spark: SparkSession, plan: LogicalPlan): SparkPlan = 
{
-    val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, 
plan.clone())
+    val sparkPlan = createSparkPlan(spark.sessionState.planner, plan.clone())
     prepareExecutedPlan(spark, sparkPlan)
   }
 
@@ -603,11 +602,11 @@ object QueryExecution {
    * This method is only called by [[PlanAdaptiveDynamicPruningFilters]].
    */
   def prepareExecutedPlan(
-      session: SparkSession,
       plan: LogicalPlan,
       context: AdaptiveExecutionContext): SparkPlan = {
-    val sparkPlan = createSparkPlan(session, session.sessionState.planner, 
plan.clone())
-    val preparationRules = preparations(session, 
Option(InsertAdaptiveSparkPlan(context)), true)
+    val sparkPlan = createSparkPlan(context.session.sessionState.planner, 
plan.clone())
+    val preparationRules =
+      preparations(context.session, Option(InsertAdaptiveSparkPlan(context)), 
true)
     prepareForExecution(preparationRules, sparkPlan.clone())
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 73fc9b1fe4e2..2855f902a850 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -153,7 +153,7 @@ case class InsertAdaptiveSparkPlan(
     // Apply the same instance of this rule to sub-queries so that sub-queries 
all share the
     // same `stageCache` for Exchange reuse.
     this.applyInternal(
-      QueryExecution.createSparkPlan(adaptiveExecutionContext.session,
+      QueryExecution.createSparkPlan(
         adaptiveExecutionContext.session.sessionState.planner, plan.clone()), 
true)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
index 77c180b18aee..751cfe5b7bb6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala
@@ -74,9 +74,7 @@ case class PlanAdaptiveDynamicPruningFilters(
           val aliases = indices.map(idx => Alias(buildKeys(idx), 
buildKeys(idx).toString)())
           val aggregate = Aggregate(aliases, aliases, buildPlan)
 
-          val session = adaptivePlan.context.session
-          val sparkPlan = QueryExecution.prepareExecutedPlan(
-            session, aggregate, adaptivePlan.context)
+          val sparkPlan = QueryExecution.prepareExecutedPlan(aggregate, 
adaptivePlan.context)
           assert(sparkPlan.isInstanceOf[AdaptiveSparkPlanExec])
           val newAdaptivePlan = sparkPlan.asInstanceOf[AdaptiveSparkPlanExec]
           val values = SubqueryExec(name, newAdaptivePlan)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
index 5f5a9e188532..059729d86bfa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
@@ -55,8 +55,7 @@ case class PlanDynamicPruningFilters(sparkSession: 
SparkSession) extends Rule[Sp
     
plan.transformAllExpressionsWithPruning(_.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
 {
       case DynamicPruningSubquery(
           value, buildPlan, buildKeys, broadcastKeyIndices, onlyInBroadcast, 
exprId, _) =>
-        val sparkPlan = QueryExecution.createSparkPlan(
-          sparkSession, sparkSession.sessionState.planner, buildPlan)
+        val sparkPlan = 
QueryExecution.createSparkPlan(sparkSession.sessionState.planner, buildPlan)
         // Using `sparkPlan` is a little hacky as it is based on the 
assumption that this rule is
         // the first to be applied (apart from `InsertAdaptiveSparkPlan`).
         val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty 
&&


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to