This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4ebdc4ae6714 [SPARK-55794][SQL] Always alias `OuterReference`s
4ebdc4ae6714 is described below
commit 4ebdc4ae67147d58d1ae807c304b418a7771d7e2
Author: Mihailo Timotic <[email protected]>
AuthorDate: Mon Mar 2 13:19:38 2026 -0800
[SPARK-55794][SQL] Always alias `OuterReference`s
### What changes were proposed in this pull request?
In this PR I propose that we always alias `OuterReference`s
### Why are the changes needed?
These changes are needed for 2 main reasons: provide avoid potential issues
with exposing raw outer references and their expressions ids and provide
compatibility between fixed-point and single-pass analyzers.
For example, in a query like:
```
table t
|> where exists (
table other
|> extend t.x
|> select * except (a, b))
```
before this change, the output will be:
```
Filter exists#x [x#1]
: +- Project [x#1]
: +- Project [a#3, b#4, outer(x#1)]
: +- SubqueryAlias spark_catalog.default.other
: +- Relation spark_catalog.default.other[a#3,b#4] json
+- PipeOperator
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#1,y#2] csv
```
The output of the subquery is now exactly the same as the one from outer
reference (`x#1`). This can potentially cause query failures or correctness
issues, but at the moment only presents as a compatibility issue between
fixed-point and single-pass analyzers
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added new test cases + existing tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54576 from mihailotim-db/mihailo-timotic_data/fix_outer_ref.
Authored-by: Mihailo Timotic <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../sql/catalyst/analysis/AliasResolution.scala | 8 +-
.../resolver/AggregateExpressionResolver.scala | 13 ++-
.../catalyst/analysis/resolver/AliasResolver.scala | 16 +---
.../catalyst/expressions/namedExpressions.scala | 51 +++++++++++-
.../catalyst/analysis/ResolveAliasesSuite.scala | 97 ++++++++++++++++++++++
.../analyzer-results/pipe-operators.sql.out | 2 +-
6 files changed, 165 insertions(+), 22 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AliasResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AliasResolution.scala
index c16170dd84dc..fd457f2c868d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AliasResolution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AliasResolution.scala
@@ -28,7 +28,8 @@ import org.apache.spark.sql.catalyst.expressions.{
Generator,
GeneratorOuter,
Literal,
- NamedExpression
+ NamedExpression,
+ OuterReference
}
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ALIAS
import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS}
@@ -50,6 +51,11 @@ object AliasResolution {
def resolve(u: UnresolvedAlias): Expression = {
val UnresolvedAlias(child, optGenAliasFunc) = u
child match {
+ case outerReference: OuterReference =>
+ val aliasName = outerReference
+
.getTagValue(OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE)
+ .getOrElse(toPrettySQL(outerReference.e))
+ Alias(outerReference, aliasName)()
case ne: NamedExpression => ne
case go @ GeneratorOuter(g: Generator) if g.resolved => MultiAlias(go,
Nil)
case e if !e.resolved => u
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
index d98b699349c9..a5966ce06554 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala
@@ -148,8 +148,11 @@ class AggregateExpressionResolver(
* [[Aggregate.aggregateExpressions]] list. Otherwise, use the [[Alias]]
from the outer
* [[Aggregate]]. This alias will later be injected into the outer
[[Aggregate]];
* - Store the name that needs to be used for the [[OuterReference]] in
- * [[OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE]] computed based on
the
- * [[AggregateExpression]] without [[OuterReference]] pulled out.
+ * [[OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE]] and
+ * [[OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE]],
both computed from the
+ * pretty-printed original [[AggregateExpression]] (before
[[OuterReference]] stripping). The
+ * former overrides [[OuterReference.sql]] and the latter overrides the
alias name assigned by
+ * [[AliasResolution.resolve]].
* - In case we have an [[AggregateExpression]] inside a [[Sort]] operator,
we need to handle it
* in a special way (see [[handleAggregateExpressionOutsideAggregate]]
for more details).
* - Return the original [[AggregateExpression]] otherwise. This is done to
stay compatible
@@ -178,9 +181,11 @@ class AggregateExpressionResolver(
resolvedOuterAggregateExpression match {
case outerReference: OuterReference =>
+ val name = toPrettySQL(aggregateExpression)
+
outerReference.setTagValue(OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE, name)
outerReference.setTagValue(
- OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE,
- toPrettySQL(aggregateExpression)
+ OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE,
+ name
)
outerReference
case other => other
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala
index bc02c4bc4572..89258ffea5dd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala
@@ -18,13 +18,7 @@
package org.apache.spark.sql.catalyst.analysis.resolver
import org.apache.spark.sql.catalyst.analysis.{AliasResolution,
UnresolvedAlias}
-import org.apache.spark.sql.catalyst.expressions.{
- Alias,
- AliasHelper,
- Expression,
- NamedExpression,
- OuterReference
-}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper,
Expression, NamedExpression}
import org.apache.spark.sql.errors.QueryCompilationErrors
/**
@@ -44,10 +38,8 @@ class AliasResolver(expressionResolver: ExpressionResolver)
/**
* Resolves [[UnresolvedAlias]] by resolving its child and computing the
alias name by calling
* [[AliasResolution]] on the result. After resolving it, we assign a
correct exprId to the
- * resulting [[Alias]]. In case result of the [[AliasResolution]] call is an
[[OuterReference]],
- * we create a new [[Alias]] using the [[AutoGeneratedAliasProvider]]. Here
we allow inner
- * aliases to persist until the end of single-pass resolution, after which
they will be removed
- * in the post-processing phase.
+ * resulting [[Alias]]. Here we allow inner aliases to persist until the end
of single-pass
+ * resolution, after which they will be removed in the post-processing phase.
*
* Resulting [[Alias]] must be added to the list of `availableAliases` in
the current
* [[NameScope]].
@@ -68,8 +60,6 @@ class AliasResolver(expressionResolver: ExpressionResolver)
val resultAlias =
expressionResolver.getExpressionIdAssigner.mapExpression(alias)
scopes.current.availableAliases.add(resultAlias.exprId)
resultAlias
- case outerReference: OuterReference =>
- autoGeneratedAliasProvider.newAlias(outerReference)
case _ =>
throw QueryCompilationErrors.unsupportedSinglePassAnalyzerFeature(
s"${resolvedNode.getClass} expression resolution"
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index ed06fb2ae05d..ccefdc0999ea 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -544,12 +544,57 @@ object OuterReference {
* single-pass implementation (first we extract the [[OuterReference]] and
then assign the name -
* in bottom-up manner).
*
- * In order to make single-pass and fixed-point implementations compatible
use earlier computed
- * name (if defined) for [[OuterReference]] (defined in
- * `AggregateExpressionResolver.handleOuterAggregateExpression`).
+ * For example, for a query like:
+ * {{{
+ * SELECT col1 AS alias
+ * FROM values(1)
+ * GROUP BY col1
+ * HAVING (
+ * SELECT col1 = 1
+ * )
+ * }}}
+ *
+ * Fixed-point analyzer will output:
+ *
+ * Filter cast(scalar-subquery#x [alias#x] as boolean)
+ * : +- Project [(outer(alias#x) = 1) AS (outer(col1) = 1)#x]
+ * : +- OneRowRelation
+ * +- Aggregate [col1#x], [col1#x AS alias#x]
+ * +- LocalRelation [col1#x]
+ *
+ * Notice that the expression in the subquery is `(outer(alias#x) = 1) AS
(outer(col1) = 1)`.
+ * This is because the initial underlying expression is `outer(col1) = 1`
which we alias, but
+ * we later update the actual outer reference to `outer(alias)`.
*/
val SINGLE_PASS_SQL_STRING_OVERRIDE =
TreeNodeTag[String]("single_pass_sql_string_override")
+
+ /**
+ * In fixed-point [[OuterReference]] is extracted in
[[UpdateOuterReferences]] which is invoked
+ * after the alias assignment ([[ResolveAliases]]) which is opposite of how
it is done in the
+ * single-pass implementation (first we extract the [[OuterReference]] and
then assign the name -
+ * in bottom-up manner).
+ *
+ * For example, in a query like:
+ * {{{
+ * SELECT t1a
+ * FROM t1
+ * WHERE t1a IN (
+ * SELECT t2a
+ * FROM t2
+ * WHERE EXISTS (SELECT min(t2a) FROM t3)
+ * )
+ * }}}
+ *
+ * In the subquery, the `min(t2c)` expression is going to be resolved to
`outer(min(t2c))`. In
+ * fixed-point analyzer, the alias of this expression will be
`min(outer(t2c))`, because alias
+ * resolution is triggered before [[UpdateOuterReference]]. However, in
single-pass analyzer, we
+ * first resolve the expression to `outer(min(t2c))` and then assign an
alias name. This causes
+ * inconsistencies between analyzer results. In order to mitigate this
issue, we save the alias
+ * name before updating outer reference.
+ */
+ val SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE =
+ TreeNodeTag[String]("single_pass_outer_aggregate_alias_name_override")
}
object VirtualColumn {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveAliasesSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveAliasesSuite.scala
index 6513db43639b..e58fdb8d0751 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveAliasesSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveAliasesSuite.scala
@@ -33,6 +33,7 @@ class ResolveAliasesSuite extends AnalysisTest {
private lazy val t1 = LocalRelation("a".attr.int)
private lazy val t2 = LocalRelation("b".attr.long)
+ private lazy val intAttr = AttributeReference("x", IntegerType)()
private def checkAliasName(plan: LogicalPlan, expected: String): Unit = {
val analyzed = getAnalyzer.execute(plan)
@@ -136,4 +137,100 @@ class ResolveAliasesSuite extends AnalysisTest {
}
}
}
+
+ test("OuterReference in UnresolvedAlias is wrapped in Alias with default
name") {
+ val outerRef = OuterReference(intAttr)
+ val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+ assert(result.isInstanceOf[Alias])
+ val alias = result.asInstanceOf[Alias]
+ assert(alias.child == outerRef)
+ assert(alias.name == "x")
+ }
+
+ test("OuterReference in UnresolvedAlias uses
SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE") {
+ val outerRef = OuterReference(intAttr)
+
outerRef.setTagValue(OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE,
"min(x)")
+ val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+ assert(result.isInstanceOf[Alias])
+ val alias = result.asInstanceOf[Alias]
+ assert(alias.child == outerRef)
+ assert(alias.name == "min(x)")
+ }
+
+ test("OuterReference in UnresolvedAlias without tag falls back to
toPrettySQL") {
+ val qualifiedAttr = AttributeReference("col", LongType)(qualifier =
Seq("t1"))
+ val outerRef = OuterReference(qualifiedAttr)
+ val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+ assert(result.isInstanceOf[Alias])
+ val alias = result.asInstanceOf[Alias]
+ assert(alias.child == outerRef)
+ assert(alias.name == "col")
+ }
+
+ test("SINGLE_PASS_SQL_STRING_OVERRIDE alone does not affect OuterReference
alias name") {
+ val outerRef = OuterReference(intAttr)
+ outerRef.setTagValue(OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE,
"sum(x)")
+ val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+ val alias = result.asInstanceOf[Alias]
+ assert(alias.name == "x")
+ }
+
+ test("Both SQL and alias override tags set - alias name uses alias
override") {
+ val outerRef = OuterReference(intAttr)
+ outerRef.setTagValue(OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE,
"sql_override")
+ outerRef.setTagValue(
+ OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE,
"alias_override")
+ val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+ val alias = result.asInstanceOf[Alias]
+ assert(alias.name == "alias_override")
+ }
+
+ test("OuterReference is wrapped in Alias rather than passed through as
NamedExpression") {
+ val outerRef = OuterReference(intAttr)
+ val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+ assert(result.isInstanceOf[Alias],
+ "OuterReference should be wrapped in Alias, not passed through as
NamedExpression")
+ }
+
+ test("OuterReference is handled correctly by assignAliases") {
+ val outerRef = OuterReference(intAttr)
+
outerRef.setTagValue(OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE,
"max(x)")
+ val exprs: Seq[NamedExpression] = Seq(UnresolvedAlias(outerRef, None))
+ val result = AliasResolution.assignAliases(exprs)
+ assert(result.length == 1)
+ val alias = result.head.asInstanceOf[Alias]
+ assert(alias.child == outerRef)
+ assert(alias.name == "max(x)")
+ }
+
+ test("assignAliases with mixed OuterReference and regular expressions") {
+ val outerRef = OuterReference(intAttr)
+ outerRef.setTagValue(
+ OuterReference.SINGLE_PASS_OUTER_AGGREGATE_ALIAS_NAME_OVERRIDE,
"count(x)")
+ val regularAttr = AttributeReference("y", LongType)()
+ val exprs: Seq[NamedExpression] = Seq(
+ UnresolvedAlias(outerRef, None),
+ regularAttr
+ )
+ val result = AliasResolution.assignAliases(exprs)
+ assert(result.length == 2)
+ val alias = result.head.asInstanceOf[Alias]
+ assert(alias.name == "count(x)")
+ assert(result(1) == regularAttr, "Regular attribute should be unchanged")
+ }
+
+ test("OuterReference with different data types") {
+ Seq(
+ ("str_col", StringType),
+ ("long_col", LongType),
+ ("dbl_col", DoubleType)
+ ).foreach { case (colName, dataType) =>
+ val attr = AttributeReference(colName, dataType)()
+ val outerRef = OuterReference(attr)
+ val result = AliasResolution.resolve(UnresolvedAlias(outerRef, None))
+ val alias = result.asInstanceOf[Alias]
+ assert(alias.child == outerRef)
+ assert(alias.name == colName)
+ }
+ }
}
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index 71cdb6eeaace..4f03e1a2c17a 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -722,7 +722,7 @@ table t
-- !query analysis
Filter exists#x [x#x]
: +- Project [x#x]
-: +- Project [a#x, b#x, outer(x#x)]
+: +- Project [a#x, b#x, outer(x#x) AS x#x]
: +- SubqueryAlias spark_catalog.default.other
: +- Relation spark_catalog.default.other[a#x,b#x] json
+- PipeOperator
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]