This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f34563442a7c [SPARK-52919][SQL] Fix DSv2 Join pushdown to use previously aliased column f34563442a7c is described below commit f34563442a7c5e3a8078f2bcc55a8478e5e0a11b Author: Petar Vasiljevic <petar.vasilje...@databricks.com> AuthorDate: Wed Jul 23 17:13:14 2025 +0800 [SPARK-52919][SQL] Fix DSv2 Join pushdown to use previously aliased column ### What changes were proposed in this pull request? There is a bug in join pushdown for DSv2 where we populate the `AttributeMap` with aliased names. If alias is null, we wouldn't populate the map for such attribute and the original column name would be used. This was wrong, because the column could've been aliased previously, so no new alias is needed in new join. Therefore, instead of using `colName` (which has the information of up to date column name) we had empty map, and would return the original column name (before join pushdown even happened; the column name from scan). ### Why are the changes needed? Bug in DSv2 join pushdown. ### Does this PR introduce _any_ user-facing change? No, it is a bug fix. ### How was this patch tested? Added new test in base suite. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51622 from PetarVasiljevic-DB/use_previously_aliased_columnName_in_join_pushdown_condition. Authored-by: Petar Vasiljevic <petar.vasilje...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/v2/V2ScanRelationPushDown.scala | 8 +++-- .../JDBCV2JoinPushdownIntegrationSuiteBase.scala | 40 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index f7f1c4f522c2..a91612625be3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -179,8 +179,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { node.output .zip(leftSideRequiredColumnsWithAliases ++ rightSideRequiredColumnsWithAliases) .collect { - case (attr, columnWithAlias) if columnWithAlias.alias() != null => - (attr, attr.withName(columnWithAlias.alias())) + case (attr, columnWithAlias) => + if (columnWithAlias.alias() != null) { + (attr, attr.withName(columnWithAlias.alias())) + } else { + (attr, attr.withName(columnWithAlias.colName())) + } } .toMap ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala index 27ca84bc2a2a..6e2898bfdf72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala @@ -600,4 +600,44 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase checkAnswer(df, rows) } } + + test("Test condition with aliased column") { + // After the first join, columns will be aliased because we are doing self join in CTE. + // Second join, is joining on aliased column, so the aliased value should be used in generated + // SQL query. + val sqlQuery = s""" + |WITH ws_wh AS ( + | SELECT + | ws1.ID, + | ws1.AMOUNT wh1, + | ws2.AMOUNT wh2 + | FROM + | $catalogAndNamespace.$casedJoinTableName1 ws1, + | $catalogAndNamespace.$casedJoinTableName1 ws2 + | WHERE + | ws1.ID = ws2.ID + | AND ws1.AMOUNT <> ws2.AMOUNT + |) + |SELECT + | NEXT_ID + |FROM + | $catalogAndNamespace.$casedJoinTableName2, + | ws_wh + |WHERE + | NEXT_ID = ws_wh.ID + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + assert(!rows.isEmpty) + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + + checkJoinPushed(df) + checkAnswer(df, rows) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org