This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 64441ee94520 [SPARK-51831][SQL][4.0] Column pruning with existsJoin 
for Datasource V2
64441ee94520 is described below

commit 64441ee945208c9b17de6f4f72c1fdc69e9bc4e7
Author: jackylee-ch <[email protected]>
AuthorDate: Thu Sep 25 11:38:07 2025 -0700

    [SPARK-51831][SQL][4.0] Column pruning with existsJoin for Datasource V2
    
    ### Why are the changes needed?
    Recently, I have been testing TPC-DS queries based on DataSource V2, and 
noticed that column pruning does not occur in scenarios involving EXISTS 
(SELECT * FROM ... WHERE ...). As a result, the scan ends up reading all 
columns instead of just the required ones. This issue is reproducible in 
queries like Q10, Q16, Q35, Q69, and Q94.
    
    This PR inserts a `Project` into the `Subquery`, ensuring that only the 
referenced columns are read from DataSource V2.
    
    Below is the plan changes for this pr.
    Before this PR
    ```
    BatchScan parquet 
file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-76b1f4fc-2e84-485c-aade-a62168987baf/t1[id#32L,
 col1#33L, col2#34L, col3#35L, col4#36L, col5#37L, col6#38L, col7#39L, 
col8#40L, col9#41L] ParquetScan DataFilters: [isnotnull(col1#33L), (col1#33L > 
5)], Format: parquet, Location: InMemoryFileIndex(1 
paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-76...,
 PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(c [...]
    ```
    After this PR
    ```
    BatchScan parquet 
file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-cd4b50d9-1643-40e6-a8e1-1429d3213411/t1[id#133L,
 col1#134L] ParquetScan DataFilters: [isnotnull(col1#134L), (col1#134L > 5)], 
Format: parquet, Location: InMemoryFileIndex(1 
paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-cd...,
 PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(col1), 
GreaterThan(col1,5)], PushedGroupBy: [], ReadSchema: struct<id:bigint, [...]
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Newly added UT.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52458 from jackylee-ch/SPARK-51831-4.0.
    
    Authored-by: jackylee-ch <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 30 ++++++++++++++++++----
 .../execution/datasources/SchemaPruningSuite.scala | 16 ++++++++++++
 2 files changed, 41 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index aa972c815591..7a8deb10f1a2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -357,6 +357,15 @@ abstract class Optimizer(catalogManager: CatalogManager)
         case other => other
       }
     }
+
+    private def optimizeSubquery(s: SubqueryExpression): SubqueryExpression = {
+      val Subquery(newPlan, _) = 
Optimizer.this.execute(Subquery.fromExpression(s))
+      // At this point we have an optimized subquery plan that we are going to 
attach
+      // to this subquery expression. Here we can safely remove any top level 
sort
+      // in the plan as tuples produced by a subquery are un-ordered.
+      s.withNewPlan(removeTopLevelSort(newPlan))
+    }
+
     def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformAllExpressionsWithPruning(
       _.containsPattern(PLAN_EXPRESSION), ruleId) {
       // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
@@ -411,12 +420,23 @@ abstract class Optimizer(catalogManager: CatalogManager)
         s.withNewPlan(
           if (needTopLevelProject) newPlan else newPlan.child
         )
+      case s: Exists =>
+        // For an EXISTS join, the subquery might be written as "SELECT * FROM 
...".
+        // If we optimize the subquery directly, column pruning may not be 
applied
+        // effectively. To address this, we add an extra Project node that 
selects
+        // only the columns referenced in the EXISTS join condition.
+        // This ensures that column pruning can be performed correctly
+        // during subquery optimization.
+        val selectedRefrences =
+          s.plan.output.filter(s.joinCond.flatMap(_.references).contains)
+        val newPlan = if (selectedRefrences.nonEmpty) {
+          s.withNewPlan(Project(selectedRefrences, s.plan))
+        } else {
+          s
+        }
+        optimizeSubquery(newPlan)
       case s: SubqueryExpression =>
-        val Subquery(newPlan, _) = 
Optimizer.this.execute(Subquery.fromExpression(s))
-        // At this point we have an optimized subquery plan that we are going 
to attach
-        // to this subquery expression. Here we can safely remove any top 
level sort
-        // in the plan as tuples produced by a subquery are un-ordered.
-        s.withNewPlan(removeTopLevelSort(newPlan))
+        optimizeSubquery(s)
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
index 0a0b23d1e601..269990d7d14e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
@@ -658,6 +658,7 @@ abstract class SchemaPruningSuite
             |where not exists (select null from employees e where e.name.first 
= c.name.first
             |  and e.employer.name = c.employer.company.name)
             |""".stripMargin)
+        // TODO: SPARK-51381: Fix the schema pruning for nested columns
         checkScan(query,
           "struct<name:struct<first:string,middle:string,last:string>," +
             
"employer:struct<id:int,company:struct<name:string,address:string>>>",
@@ -668,6 +669,21 @@ abstract class SchemaPruningSuite
     }
   }
 
+  testSchemaPruning("SPARK-51831: Column pruning with exists Join") {
+    withContacts {
+      val query = sql(
+        """
+          |select sum(t1.id) as sum_id
+          |from contacts as t1
+          |where exists(select * from contacts as t2 where t1.id == t2.id)
+          |""".stripMargin)
+      checkScan(query,
+        "struct<id:int>",
+        "struct<id:int>")
+      checkAnswer(query, Row(6))
+    }
+  }
+
   protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit 
= {
     test(s"Spark vectorized reader - without partition data column - 
$testName") {
       withSQLConf(vectorizedReaderEnabledKey -> "true") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to