This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f34563442a7c [SPARK-52919][SQL] Fix DSv2 Join pushdown to use 
previously aliased column
f34563442a7c is described below

commit f34563442a7c5e3a8078f2bcc55a8478e5e0a11b
Author: Petar Vasiljevic <petar.vasilje...@databricks.com>
AuthorDate: Wed Jul 23 17:13:14 2025 +0800

    [SPARK-52919][SQL] Fix DSv2 Join pushdown to use previously aliased column
    
    ### What changes were proposed in this pull request?
    There is a bug in join pushdown for DSv2 where we populate the 
`AttributeMap` with aliased names. If alias is null, we wouldn't populate the 
map for such attribute and the original column name would be used.
    
    This was wrong, because the column could've been aliased previously, so no 
new alias is needed in new join. Therefore, instead of using `colName` (which 
has the information of up to date column name) we had empty map, and would 
return the original column name (before join pushdown even happened; the column 
name from scan).
    
    ### Why are the changes needed?
    Bug in DSv2 join pushdown.
    
    ### Does this PR introduce _any_ user-facing change?
    No, it is a bug fix.
    
    ### How was this patch tested?
    Added new test in base suite.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #51622 from 
PetarVasiljevic-DB/use_previously_aliased_columnName_in_join_pushdown_condition.
    
    Authored-by: Petar Vasiljevic <petar.vasilje...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../datasources/v2/V2ScanRelationPushDown.scala    |  8 +++--
 .../JDBCV2JoinPushdownIntegrationSuiteBase.scala   | 40 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index f7f1c4f522c2..a91612625be3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -179,8 +179,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
         node.output
           .zip(leftSideRequiredColumnsWithAliases ++ 
rightSideRequiredColumnsWithAliases)
           .collect {
-            case (attr, columnWithAlias) if columnWithAlias.alias() != null =>
-              (attr, attr.withName(columnWithAlias.alias()))
+            case (attr, columnWithAlias) =>
+              if (columnWithAlias.alias() != null) {
+                (attr, attr.withName(columnWithAlias.alias()))
+              } else {
+                (attr, attr.withName(columnWithAlias.colName()))
+              }
           }
           .toMap
       )
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
index 27ca84bc2a2a..6e2898bfdf72 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
@@ -600,4 +600,44 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
       checkAnswer(df, rows)
     }
   }
+
+  test("Test condition with aliased column") {
+    // After the first join, columns will be aliased because we are doing self 
join in CTE.
+    // Second join, is joining on aliased column, so the aliased value should 
be used in generated
+    // SQL query.
+    val sqlQuery = s"""
+      |WITH ws_wh AS (
+      |    SELECT
+      |        ws1.ID,
+      |        ws1.AMOUNT wh1,
+      |        ws2.AMOUNT wh2
+      |    FROM
+      |        $catalogAndNamespace.$casedJoinTableName1 ws1,
+      |        $catalogAndNamespace.$casedJoinTableName1 ws2
+      |    WHERE
+      |        ws1.ID = ws2.ID
+      |        AND ws1.AMOUNT <> ws2.AMOUNT
+      |)
+      |SELECT
+      |   NEXT_ID
+      |FROM
+      |   $catalogAndNamespace.$casedJoinTableName2,
+      |   ws_wh
+      |WHERE
+      |   NEXT_ID = ws_wh.ID
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    assert(!rows.isEmpty)
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      checkJoinPushed(df)
+      checkAnswer(df, rows)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to