This is an automated email from the ASF dual-hosted git repository.

dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ebdc4ae6714 [SPARK-55794][SQL] Always alias `OuterReference`s
4ebdc4ae6714 is described below

commit 4ebdc4ae67147d58d1ae807c304b418a7771d7e2
Author: Mihailo Timotic <[email protected]>
AuthorDate: Mon Mar 2 13:19:38 2026 -0800

    [SPARK-55794][SQL] Always alias `OuterReference`s
    
    ### What changes were proposed in this pull request?
    In this PR I propose that we always alias `OuterReference`s
    
    ### Why are the changes needed?
    These changes are needed for 2 main reasons: provide avoid potential issues 
with exposing raw outer references and their expressions ids and provide 
compatibility between fixed-point and single-pass analyzers.
    
    For example, in a query like:
    ```
    table t
    |> where exists (
        table other
        |> extend t.x
        |> select * except (a, b))
    ```
    before this change, the output will be:
    ```
    Filter exists#x [x#1]
    :  +- Project [x#1]
    :     +- Project [a#3, b#4, outer(x#1)]
    :        +- SubqueryAlias spark_catalog.default.other
    :           +- Relation spark_catalog.default.other[a#3,b#4] json
    +- PipeOperator
       +- SubqueryAlias spark_catalog.default.t
          +- Relation spark_catalog.default.t[x#1,y#2] csv
    ```
    The output of the subquery is now exactly the same as the one from outer 
reference (`x#1`). This can potentially cause query failures or correctness 
issues, but at the moment only presents as a compatibility issue between 
fixed-point and single-pass analyzers
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added new test cases + existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #54576 from mihailotim-db/mihailo-timotic_data/fix_outer_ref.
    
    Authored-by: Mihailo Timotic <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 .../sql/catalyst/analysis/AliasResolution.scala    |  8 +-
 .../resolver/AggregateExpressionResolver.scala     | 13 ++-
 .../catalyst/analysis/resolver/AliasResolver.scala | 16 +---
 .../catalyst/expressions/namedExpressions.scala    | 51 +++++++++++-
 .../catalyst/analysis/ResolveAliasesSuite.scala    | 97 ++++++++++++++++++++++
 .../analyzer-results/pipe-operators.sql.out        |  2 +-
 6 files changed, 165 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AliasResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AliasResolution.scala
index c16170dd84dc..fd457f2c868d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AliasResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AliasResolution.scala
@@ -28,7 +28,8 @@ import org.apache.spark.sql.catalyst.expressions.{
   Generator,
   GeneratorOuter,
   Literal,
-  NamedExpression
+  NamedExpression,
+  OuterReference
 }
 import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ALIAS
 import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS}
@@ -50,6 +51,11 @@ object AliasResolution {
   def resolve(u: UnresolvedAlias): Expression = {
     val UnresolvedAlias(child, optGenAliasFunc) = u
     child match {
+      case outerReference: OuterReference =>
+        val aliasName = outerReference
+          
.getTagValue(OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE)
+          .getOrElse(toPrettySQL(outerReference.e))
+        Alias(outerReference, aliasName)()
       case ne: NamedExpression => ne
       case go @ GeneratorOuter(g: Generator) if g.resolved => MultiAlias(go, 
Nil)
       case e if !e.resolved => u
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
index d98b699349c9..a5966ce06554 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
@@ -148,8 +148,11 @@ class AggregateExpressionResolver(
    *    [[Aggregate.aggregateExpressions]] list. Otherwise, use the [[Alias]] 
from the outer
    *    [[Aggregate]]. This alias will later be injected into the outer 
[[Aggregate]];
    *  - Store the name that needs to be used for the [[OuterReference]] in
-   *    [[OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE]] computed based on 
the
-   *    [[AggregateExpression]] without [[OuterReference]] pulled out.
+   *    [[OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE]] and
+   *    [[OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE]], 
both computed from the
+   *    pretty-printed original [[AggregateExpression]] (before 
[[OuterReference]] stripping). The
+   *    former overrides [[OuterReference.sql]] and the latter overrides the 
alias name assigned by
+   *    [[AliasResolution.resolve]].
    *  - In case we have an [[AggregateExpression]] inside a [[Sort]] operator, 
we need to handle it
    *    in a special way (see [[handleAggregateExpressionOutsideAggregate]] 
for more details).
    *  - Return the original [[AggregateExpression]] otherwise. This is done to 
stay compatible
@@ -178,9 +181,11 @@ class AggregateExpressionResolver(
 
     resolvedOuterAggregateExpression match {
       case outerReference: OuterReference =>
+        val name = toPrettySQL(aggregateExpression)
+        
outerReference.setTagValue(OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE, name)
         outerReference.setTagValue(
-          OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE,
-          toPrettySQL(aggregateExpression)
+          OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE,
+          name
         )
         outerReference
       case other => other
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala
index bc02c4bc4572..89258ffea5dd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala
@@ -18,13 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis.resolver
 
 import org.apache.spark.sql.catalyst.analysis.{AliasResolution, 
UnresolvedAlias}
-import org.apache.spark.sql.catalyst.expressions.{
-  Alias,
-  AliasHelper,
-  Expression,
-  NamedExpression,
-  OuterReference
-}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, 
Expression, NamedExpression}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 
 /**
@@ -44,10 +38,8 @@ class AliasResolver(expressionResolver: ExpressionResolver)
   /**
    * Resolves [[UnresolvedAlias]] by resolving its child and computing the 
alias name by calling
    * [[AliasResolution]] on the result. After resolving it, we assign a 
correct exprId to the
-   * resulting [[Alias]]. In case result of the [[AliasResolution]] call is an 
[[OuterReference]],
-   * we create a new [[Alias]] using the [[AutoGeneratedAliasProvider]]. Here 
we allow inner
-   * aliases to persist until the end of single-pass resolution, after which 
they will be removed
-   * in the post-processing phase.
+   * resulting [[Alias]]. Here we allow inner aliases to persist until the end 
of single-pass
+   * resolution, after which they will be removed in the post-processing phase.
    *
    * Resulting [[Alias]] must be added to the list of `availableAliases` in 
the current
    * [[NameScope]].
@@ -68,8 +60,6 @@ class AliasResolver(expressionResolver: ExpressionResolver)
           val resultAlias = 
expressionResolver.getExpressionIdAssigner.mapExpression(alias)
           scopes.current.availableAliases.add(resultAlias.exprId)
           resultAlias
-        case outerReference: OuterReference =>
-          autoGeneratedAliasProvider.newAlias(outerReference)
         case _ =>
           throw QueryCompilationErrors.unsupportedSinglePassAnalyzerFeature(
             s"${resolvedNode.getClass} expression resolution"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index ed06fb2ae05d..ccefdc0999ea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -544,12 +544,57 @@ object OuterReference {
    * single-pass implementation (first we extract the [[OuterReference]] and 
then assign the name -
    * in bottom-up manner).
    *
-   * In order to make single-pass and fixed-point implementations compatible 
use earlier computed
-   * name (if defined) for [[OuterReference]] (defined in
-   * `AggregateExpressionResolver.handleOuterAggregateExpression`).
+   * For example, for a query like:
+   * {{{
+   *  SELECT col1 AS alias
+   *  FROM values(1)
+   *  GROUP BY col1
+   *  HAVING (
+   *    SELECT col1 = 1
+   *  )
+   * }}}
+   *
+   * Fixed-point analyzer will output:
+   *
+   * Filter cast(scalar-subquery#x [alias#x] as boolean)
+   * :  +- Project [(outer(alias#x) = 1) AS (outer(col1) = 1)#x]
+   * :     +- OneRowRelation
+   * +- Aggregate [col1#x], [col1#x AS alias#x]
+   *    +- LocalRelation [col1#x]
+   *
+   * Notice that the expression in the subquery is `(outer(alias#x) = 1) AS 
(outer(col1) = 1)`.
+   * This is because the initial underlying expression is `outer(col1) = 1` 
which we alias, but
+   * we later update the actual outer reference to `outer(alias)`.
    */
   val SINGLE_PASS_SQL_STRING_OVERRIDE =
     TreeNodeTag[String]("single_pass_sql_string_override")
+
+  /**
+   * In fixed-point [[OuterReference]] is extracted in 
[[UpdateOuterReferences]] which is invoked
+   * after the alias assignment ([[ResolveAliases]]) which is opposite of how 
it is done in the
+   * single-pass implementation (first we extract the [[OuterReference]] and 
then assign the name -
+   * in bottom-up manner).
+   *
+   * For example, in a query like:
+   * {{{
+   *   SELECT t1a
+   *   FROM t1
+   *   WHERE t1a IN (
+   *                 SELECT t2a
+   *                 FROM t2
+   *                 WHERE EXISTS (SELECT min(t2a) FROM t3)
+   *                 )
+   * }}}
+   *
+   * In the subquery, the `min(t2c)` expression is going to be resolved to 
`outer(min(t2c))`. In
+   * fixed-point analyzer, the alias of this expression will be 
`min(outer(t2c))`, because alias
+   * resolution is triggered before [[UpdateOuterReference]]. However, in 
single-pass analyzer, we
+   * first resolve the expression to `outer(min(t2c))` and then assign an 
alias name. This causes
+   * inconsistencies between analyzer results. In order to mitigate this 
issue, we save the alias
+   * name before updating outer reference.
+   */
+  val SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE =
+    TreeNodeTag[String]("single_pass_outer_aggregate_alias_name_override")
 }
 
 object VirtualColumn {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveAliasesSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveAliasesSuite.scala
index 6513db43639b..e58fdb8d0751 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveAliasesSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveAliasesSuite.scala
@@ -33,6 +33,7 @@ class ResolveAliasesSuite extends AnalysisTest {
 
   private lazy val t1 = LocalRelation("a".attr.int)
   private lazy val t2 = LocalRelation("b".attr.long)
+  private lazy val intAttr = AttributeReference("x", IntegerType)()
 
   private def checkAliasName(plan: LogicalPlan, expected: String): Unit = {
     val analyzed = getAnalyzer.execute(plan)
@@ -136,4 +137,100 @@ class ResolveAliasesSuite extends AnalysisTest {
       }
     }
   }
+
+  test("OuterReference in UnresolvedAlias is wrapped in Alias with default 
name") {
+    val outerRef = OuterReference(intAttr)
+    val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+    assert(result.isInstanceOf[Alias])
+    val alias = result.asInstanceOf[Alias]
+    assert(alias.child == outerRef)
+    assert(alias.name == "x")
+  }
+
+  test("OuterReference in UnresolvedAlias uses 
SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE") {
+    val outerRef = OuterReference(intAttr)
+    
outerRef.setTagValue(OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE,
 "min(x)")
+    val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+    assert(result.isInstanceOf[Alias])
+    val alias = result.asInstanceOf[Alias]
+    assert(alias.child == outerRef)
+    assert(alias.name == "min(x)")
+  }
+
+  test("OuterReference in UnresolvedAlias without tag falls back to 
toPrettySQL") {
+    val qualifiedAttr = AttributeReference("col", LongType)(qualifier = 
Seq("t1"))
+    val outerRef = OuterReference(qualifiedAttr)
+    val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+    assert(result.isInstanceOf[Alias])
+    val alias = result.asInstanceOf[Alias]
+    assert(alias.child == outerRef)
+    assert(alias.name == "col")
+  }
+
+  test("SINGLE_PASS_SQL_STRING_OVERRIDE alone does not affect OuterReference 
alias name") {
+    val outerRef = OuterReference(intAttr)
+    outerRef.setTagValue(OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE, 
"sum(x)")
+    val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+    val alias = result.asInstanceOf[Alias]
+    assert(alias.name == "x")
+  }
+
+  test("Both SQL and alias override tags set - alias name uses alias 
override") {
+    val outerRef = OuterReference(intAttr)
+    outerRef.setTagValue(OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE, 
"sql_override")
+    outerRef.setTagValue(
+      OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE, 
"alias_override")
+    val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+    val alias = result.asInstanceOf[Alias]
+    assert(alias.name == "alias_override")
+  }
+
+  test("OuterReference is wrapped in Alias rather than passed through as 
NamedExpression") {
+    val outerRef = OuterReference(intAttr)
+    val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+    assert(result.isInstanceOf[Alias],
+      "OuterReference should be wrapped in Alias, not passed through as 
NamedExpression")
+  }
+
+  test("OuterReference is handled correctly by assignAliases") {
+    val outerRef = OuterReference(intAttr)
+    
outerRef.setTagValue(OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE,
 "max(x)")
+    val exprs: Seq[NamedExpression] = Seq(UnresolvedAlias(outerRef, None))
+    val result = AliasResolution.assignAliases(exprs)
+    assert(result.length == 1)
+    val alias = result.head.asInstanceOf[Alias]
+    assert(alias.child == outerRef)
+    assert(alias.name == "max(x)")
+  }
+
+  test("assignAliases with mixed OuterReference and regular expressions") {
+    val outerRef = OuterReference(intAttr)
+    outerRef.setTagValue(
+      OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE, 
"count(x)")
+    val regularAttr = AttributeReference("y", LongType)()
+    val exprs: Seq[NamedExpression] = Seq(
+      UnresolvedAlias(outerRef, None),
+      regularAttr
+    )
+    val result = AliasResolution.assignAliases(exprs)
+    assert(result.length == 2)
+    val alias = result.head.asInstanceOf[Alias]
+    assert(alias.name == "count(x)")
+    assert(result(1) == regularAttr, "Regular attribute should be unchanged")
+  }
+
+  test("OuterReference with different data types") {
+    Seq(
+      ("str_col", StringType),
+      ("long_col", LongType),
+      ("dbl_col", DoubleType)
+    ).foreach { case (colName, dataType) =>
+      val attr = AttributeReference(colName, dataType)()
+      val outerRef = OuterReference(attr)
+      val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+      val alias = result.asInstanceOf[Alias]
+      assert(alias.child == outerRef)
+      assert(alias.name == colName)
+    }
+  }
 }
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index 71cdb6eeaace..4f03e1a2c17a 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -722,7 +722,7 @@ table t
 -- !query analysis
 Filter exists#x [x#x]
 :  +- Project [x#x]
-:     +- Project [a#x, b#x, outer(x#x)]
+:     +- Project [a#x, b#x, outer(x#x) AS x#x]
 :        +- SubqueryAlias spark_catalog.default.other
 :           +- Relation spark_catalog.default.other[a#x,b#x] json
 +- PipeOperator


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to