This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 64441ee94520 [SPARK-51831][SQL][4.0] Column pruning with existsJoin
for Datasource V2
64441ee94520 is described below
commit 64441ee945208c9b17de6f4f72c1fdc69e9bc4e7
Author: jackylee-ch <[email protected]>
AuthorDate: Thu Sep 25 11:38:07 2025 -0700
[SPARK-51831][SQL][4.0] Column pruning with existsJoin for Datasource V2
### Why are the changes needed?
Recently, I have been testing TPC-DS queries based on DataSource V2, and
noticed that column pruning does not occur in scenarios involving EXISTS
(SELECT * FROM ... WHERE ...). As a result, the scan ends up reading all
columns instead of just the required ones. This issue is reproducible in
queries like Q10, Q16, Q35, Q69, and Q94.
This PR inserts a `Project` into the `Subquery`, ensuring that only the
referenced columns are read from DataSource V2.
Below is the plan changes for this pr.
Before this PR
```
BatchScan parquet
file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-76b1f4fc-2e84-485c-aade-a62168987baf/t1[id#32L,
col1#33L, col2#34L, col3#35L, col4#36L, col5#37L, col6#38L, col7#39L,
col8#40L, col9#41L] ParquetScan DataFilters: [isnotnull(col1#33L), (col1#33L >
5)], Format: parquet, Location: InMemoryFileIndex(1
paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-76...,
PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(c [...]
```
After this PR
```
BatchScan parquet
file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-cd4b50d9-1643-40e6-a8e1-1429d3213411/t1[id#133L,
col1#134L] ParquetScan DataFilters: [isnotnull(col1#134L), (col1#134L > 5)],
Format: parquet, Location: InMemoryFileIndex(1
paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-cd...,
PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(col1),
GreaterThan(col1,5)], PushedGroupBy: [], ReadSchema: struct<id:bigint, [...]
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Newly added UT.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52458 from jackylee-ch/SPARK-51831-4.0.
Authored-by: jackylee-ch <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 30 ++++++++++++++++++----
.../execution/datasources/SchemaPruningSuite.scala | 16 ++++++++++++
2 files changed, 41 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index aa972c815591..7a8deb10f1a2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -357,6 +357,15 @@ abstract class Optimizer(catalogManager: CatalogManager)
case other => other
}
}
+
+ private def optimizeSubquery(s: SubqueryExpression): SubqueryExpression = {
+ val Subquery(newPlan, _) =
Optimizer.this.execute(Subquery.fromExpression(s))
+ // At this point we have an optimized subquery plan that we are going to
attach
+ // to this subquery expression. Here we can safely remove any top level
sort
+ // in the plan as tuples produced by a subquery are un-ordered.
+ s.withNewPlan(removeTopLevelSort(newPlan))
+ }
+
def apply(plan: LogicalPlan): LogicalPlan =
plan.transformAllExpressionsWithPruning(
_.containsPattern(PLAN_EXPRESSION), ruleId) {
// Do not optimize DPP subquery, as it was created from optimized plan
and we should not
@@ -411,12 +420,23 @@ abstract class Optimizer(catalogManager: CatalogManager)
s.withNewPlan(
if (needTopLevelProject) newPlan else newPlan.child
)
+ case s: Exists =>
+ // For an EXISTS join, the subquery might be written as "SELECT * FROM
...".
+ // If we optimize the subquery directly, column pruning may not be
applied
+ // effectively. To address this, we add an extra Project node that
selects
+ // only the columns referenced in the EXISTS join condition.
+ // This ensures that column pruning can be performed correctly
+ // during subquery optimization.
+ val selectedRefrences =
+ s.plan.output.filter(s.joinCond.flatMap(_.references).contains)
+ val newPlan = if (selectedRefrences.nonEmpty) {
+ s.withNewPlan(Project(selectedRefrences, s.plan))
+ } else {
+ s
+ }
+ optimizeSubquery(newPlan)
case s: SubqueryExpression =>
- val Subquery(newPlan, _) =
Optimizer.this.execute(Subquery.fromExpression(s))
- // At this point we have an optimized subquery plan that we are going
to attach
- // to this subquery expression. Here we can safely remove any top
level sort
- // in the plan as tuples produced by a subquery are un-ordered.
- s.withNewPlan(removeTopLevelSort(newPlan))
+ optimizeSubquery(s)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
index 0a0b23d1e601..269990d7d14e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
@@ -658,6 +658,7 @@ abstract class SchemaPruningSuite
|where not exists (select null from employees e where e.name.first
= c.name.first
| and e.employer.name = c.employer.company.name)
|""".stripMargin)
+ // TODO: SPARK-51381: Fix the schema pruning for nested columns
checkScan(query,
"struct<name:struct<first:string,middle:string,last:string>," +
"employer:struct<id:int,company:struct<name:string,address:string>>>",
@@ -668,6 +669,21 @@ abstract class SchemaPruningSuite
}
}
+ testSchemaPruning("SPARK-51831: Column pruning with exists Join") {
+ withContacts {
+ val query = sql(
+ """
+ |select sum(t1.id) as sum_id
+ |from contacts as t1
+ |where exists(select * from contacts as t2 where t1.id == t2.id)
+ |""".stripMargin)
+ checkScan(query,
+ "struct<id:int>",
+ "struct<id:int>")
+ checkAnswer(query, Row(6))
+ }
+ }
+
protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit
= {
test(s"Spark vectorized reader - without partition data column -
$testName") {
withSQLConf(vectorizedReaderEnabledKey -> "true") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]