This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a77713081a8 [SPARK-46285][SQL] Add `foreachWithSubqueries`
a77713081a8 is described below

commit a77713081a8065a5feaa3b438a79fdeb6a4b6782
Author: Rui Wang <[email protected]>
AuthorDate: Sun Dec 10 14:00:15 2023 -0800

    [SPARK-46285][SQL] Add `foreachWithSubqueries`
    
    ### What changes were proposed in this pull request?
    
    We can have a `foreachWithSubqueries` which also traverse the subqueries in 
the query plan.
    
    ### Why are the changes needed?
    
    Add a new way to access subqueries in the query plan.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #44206 from amaliujia/foreachsubqueries.
    
    Authored-by: Rui Wang <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/sql/catalyst/plans/QueryPlan.scala     | 11 +++++++++++
 .../apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala  | 13 +++++++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 9656a93575d..ef7cd7401f2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -534,6 +534,17 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
     transformDownWithPruning(cond, ruleId)(g)
   }
 
+  /**
+   * A variant of [[foreach]] which considers plan nodes inside subqueries as 
well.
+   */
+  def foreachWithSubqueries(f: PlanType => Unit): Unit = {
+    def actualFunc(plan: PlanType): Unit = {
+      f(plan)
+      plan.subqueries.foreach(_.foreachWithSubqueries(f))
+    }
+    foreach(actualFunc)
+  }
+
   /**
    * A variant of `collect`. This method not only apply the given function to 
all elements in this
    * plan, also considering all the plans in its (nested) subqueries
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
index 3eba9eebc3d..31f7e07143c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.plans
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
@@ -145,4 +146,16 @@ class LogicalPlanSuite extends SparkFunSuite {
     assert(query.where(Literal.FalseLiteral).maxRows.contains(0))
     assert(query.where(Literal.FalseLiteral).maxRowsPerPartition.contains(0))
   }
+
+  test("SPARK-46285: foreachWithSubqueries") {
+    val input = UnresolvedRelation(Seq("subquery_table"))
+    val input2 = UnresolvedRelation(Seq("t"))
+    val plan = Filter(Exists(input), input2)
+    val tableNames = scala.collection.mutable.Set[String]()
+    plan.foreachWithSubqueries {
+      case e: UnresolvedRelation => tableNames.add(e.name)
+      case _ =>
+    }
+    assert(tableNames.contains("subquery_table"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to