This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 0b627ae47d99 [SPARK-48114][SQL] Move subquery validation out of 
CheckAnalysis
0b627ae47d99 is described below

commit 0b627ae47d99468d3b55b245c61b721e94c24ca4
Author: Vladimir Golubev <[email protected]>
AuthorDate: Tue Feb 18 22:49:48 2025 +0100

    [SPARK-48114][SQL] Move subquery validation out of CheckAnalysis
    
    ### What changes were proposed in this pull request?
    
    Move subquery validation out of `CheckAnalysis`.
    
    ### Why are the changes needed?
    
    To be reused in the single-pass Analyzer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, refactoring.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49994 from 
vladimirg-db/vladimirg-db/move-subquery-validation-out-of-check-analysis.
    
    Authored-by: Vladimir Golubev <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
    (cherry picked from commit ead7d58ab0b664951f76f98db23ffb1409bf0e5f)
    Signed-off-by: Max Gekk <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   2 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 522 +-------------------
 .../spark/sql/catalyst/analysis/PlanToString.scala |  36 ++
 .../analysis/ValidateSubqueryExpression.scala      | 538 +++++++++++++++++++++
 4 files changed, 578 insertions(+), 520 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7b23abcf2f50..7f59e0f47373 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -232,7 +232,7 @@ object AnalysisContext {
  * [[UnresolvedRelation]]s into fully typed objects using information in a 
[[SessionCatalog]].
  */
 class Analyzer(override val catalogManager: CatalogManager) extends 
RuleExecutor[LogicalPlan]
-  with CheckAnalysis with SQLConfHelper with ColumnResolutionHelper {
+  with CheckAnalysis with AliasHelper with SQLConfHelper with 
ColumnResolutionHelper {
 
   private val v1SessionCatalog: SessionCatalog = 
catalogManager.v1SessionCatalog
   private val relationResolution = new RelationResolution(catalogManager)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c7d5c355270f..1b45fcde9126 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -19,15 +19,12 @@ package org.apache.spark.sql.catalyst.analysis
 import scala.collection.mutable
 
 import org.apache.spark.{SparkException, SparkThrowable}
-import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.ExtendedAnalysisException
 import 
org.apache.spark.sql.catalyst.analysis.ResolveWithCTE.{checkForSelfReferenceInSubquery,
 checkIfSelfReferenceIsPlacedCorrectly}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
AggregateFunction, ListAgg, Median, PercentileCont, PercentileDisc}
-import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, 
DecorrelateInnerQuery, InlineCTE}
-import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.optimizer.InlineCTE
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import 
org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE,
 PLAN_EXPRESSION, UNRESOLVED_WINDOW_EXPRESSION}
@@ -38,12 +35,11 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.util.ArrayImplicits._
-import org.apache.spark.util.Utils
 
 /**
  * Throws user facing errors when passed invalid queries that fail to analyze.
  */
-trait CheckAnalysis extends PredicateHelper with LookupCatalog with 
QueryErrorsBase with Logging {
+trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with 
PlanToString {
 
   protected def isView(nameParts: Seq[String]): Boolean
 
@@ -1037,214 +1033,9 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
     }
   }
 
-  private def scrubOutIds(string: String): String =
-    string.replaceAll("#\\d+", "#x")
-      .replaceAll("operator id = \\d+", "operator id = #x")
-      .replaceAll("rand\\(-?\\d+\\)", "rand(number)")
-
-  private def planToString(plan: LogicalPlan): String = {
-    if (Utils.isTesting) scrubOutIds(plan.toString) else plan.toString
-  }
-
-  private def exprsToString(exprs: Seq[Expression]): String = {
-    val result = exprs.map(_.toString).mkString("\n")
-    if (Utils.isTesting) scrubOutIds(result) else result
-  }
-
-  /**
-   * Validates subquery expressions in the plan. Upon failure, returns an user 
facing error.
-   */
   def checkSubqueryExpression(plan: LogicalPlan, expr: SubqueryExpression): 
Unit = {
-    def checkAggregateInScalarSubquery(
-        conditions: Seq[Expression],
-        query: LogicalPlan, agg: Aggregate): Unit = {
-      // Make sure correlated scalar subqueries contain one row for every 
outer row by
-      // enforcing that they are aggregates containing exactly one aggregate 
expression.
-      val aggregates = agg.expressions.flatMap(_.collect {
-        case a: AggregateExpression => a
-      })
-      if (aggregates.isEmpty) {
-        expr.failAnalysis(
-          errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-            "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
-          messageParameters = Map.empty)
-      }
-
-      val nonEquivalentGroupByExprs = nonEquivalentGroupbyCols(query, agg)
-      val invalidCols = if (!SQLConf.get.getConf(
-        
SQLConf.LEGACY_SCALAR_SUBQUERY_ALLOW_GROUP_BY_NON_EQUALITY_CORRELATED_PREDICATE))
 {
-        nonEquivalentGroupByExprs
-      } else {
-        // Legacy incorrect logic for checking for invalid group-by columns 
(see SPARK-48503).
-        // Allows any inner attribute that appears in a correlated predicate, 
even if it is a
-        // non-equality predicate or under an operator that can change the 
values of the attribute
-        // (see comments on getCorrelatedEquivalentInnerColumns for examples).
-        // Note: groupByCols does not contain outer refs - grouping by an 
outer ref is always ok
-        val groupByCols = 
AttributeSet(agg.groupingExpressions.flatMap(_.references))
-        val subqueryColumns = 
getCorrelatedPredicates(query).flatMap(_.references)
-          .filterNot(conditions.flatMap(_.references).contains)
-        val correlatedCols = AttributeSet(subqueryColumns)
-        val invalidColsLegacy = groupByCols -- correlatedCols
-        if (!nonEquivalentGroupByExprs.isEmpty && invalidColsLegacy.isEmpty) {
-          logWarning(log"Using legacy behavior for " +
-            log"${MDC(LogKeys.CONFIG, SQLConf
-            
.LEGACY_SCALAR_SUBQUERY_ALLOW_GROUP_BY_NON_EQUALITY_CORRELATED_PREDICATE.key)}. 
" +
-            log"Query would be rejected with non-legacy behavior but is 
allowed by " +
-            log"legacy behavior. Query may be invalid and return wrong results 
if the scalar " +
-            log"subquery's group-by outputs multiple rows.")
-        }
-        invalidColsLegacy
-      }
-
-      if (invalidCols.nonEmpty) {
-        val names = invalidCols.map { el =>
-          el match {
-            case attr: Attribute => attr.name
-            case expr: Expression => expr.toString
-          }
-        }
-        expr.failAnalysis(
-          errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-            "NON_CORRELATED_COLUMNS_IN_GROUP_BY",
-          messageParameters = Map("value" -> names.mkString(",")))
-      }
-    }
-
-    // Skip subquery aliases added by the Analyzer as well as hints.
-    // For projects, do the necessary mapping and skip to its child.
-    @scala.annotation.tailrec
-    def cleanQueryInScalarSubquery(p: LogicalPlan): LogicalPlan = p match {
-      case s: SubqueryAlias => cleanQueryInScalarSubquery(s.child)
-      // Skip SQL function node added by the Analyzer
-      case s: SQLFunctionNode => cleanQueryInScalarSubquery(s.child)
-      case p: Project => cleanQueryInScalarSubquery(p.child)
-      case h: ResolvedHint => cleanQueryInScalarSubquery(h.child)
-      case child => child
-    }
-
-    // Check whether the given expressions contains the subquery expression.
-    def containsExpr(expressions: Seq[Expression]): Boolean = {
-      expressions.exists(_.exists(_.semanticEquals(expr)))
-    }
-
-    def checkOuterReference(p: LogicalPlan, expr: SubqueryExpression): Unit = 
p match {
-      case f: Filter =>
-        if (hasOuterReferences(expr.plan)) {
-          expr.plan.expressions.foreach(_.foreachUp {
-            case o: OuterReference =>
-              p.children.foreach(e =>
-                if (!e.output.exists(_.exprId == o.exprId)) {
-                  o.failAnalysis(
-                    errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-                      "CORRELATED_COLUMN_NOT_FOUND",
-                    messageParameters = Map("value" -> o.name))
-                })
-            case _ =>
-          })
-        }
-      case _ =>
-    }
-
-    // Validate the subquery plan.
     checkAnalysis0(expr.plan)
-
-    // Check if there is outer attribute that cannot be found from the plan.
-    checkOuterReference(plan, expr)
-
-    expr match {
-      case ScalarSubquery(query, outerAttrs, _, _, _, _, _) =>
-        // Scalar subquery must return one column as output.
-        if (query.output.size != 1) {
-          throw 
QueryCompilationErrors.subqueryReturnMoreThanOneColumn(query.output.size,
-            expr.origin)
-        }
-
-        if (outerAttrs.nonEmpty) {
-          if (!SQLConf.get.getConf(SQLConf.SCALAR_SUBQUERY_USE_SINGLE_JOIN)) {
-            cleanQueryInScalarSubquery(query) match {
-              case a: Aggregate => checkAggregateInScalarSubquery(outerAttrs, 
query, a)
-              case Filter(_, a: Aggregate) => 
checkAggregateInScalarSubquery(outerAttrs, query, a)
-              case p: LogicalPlan if p.maxRows.exists(_ <= 1) => // Ok
-              case other =>
-                expr.failAnalysis(
-                  errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-                    "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
-                  messageParameters = Map.empty)
-            }
-          }
-
-          // Only certain operators are allowed to host subquery expression 
containing
-          // outer references.
-          plan match {
-            case _: Filter | _: Project | _: SupportsSubquery => // Ok
-            case a: Aggregate =>
-              // If the correlated scalar subquery is in the grouping 
expressions of an Aggregate,
-              // it must also be in the aggregate expressions to be rewritten 
in the optimization
-              // phase.
-              if (containsExpr(a.groupingExpressions) && 
!containsExpr(a.aggregateExpressions)) {
-                a.failAnalysis(
-                  errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-                    "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
-                  messageParameters = Map.empty)
-              }
-            case other =>
-              other.failAnalysis(
-                errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-                  "UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY",
-                messageParameters = Map("treeNode" -> planToString(other)))
-          }
-        }
-        // Validate to make sure the correlations appearing in the query are 
valid and
-        // allowed by spark.
-        checkCorrelationsInSubquery(expr.plan, isScalar = true)
-
-      case _: LateralSubquery =>
-        assert(plan.isInstanceOf[LateralJoin])
-        val join = plan.asInstanceOf[LateralJoin]
-        // A lateral join with a multi-row outer query and a non-deterministic 
lateral subquery
-        // cannot be decorrelated. Otherwise it may produce incorrect results.
-        if (!expr.deterministic && !join.left.maxRows.exists(_ <= 1)) {
-          cleanQueryInScalarSubquery(join.right.plan) match {
-            // Python UDTFs are by default non-deterministic. They are 
constructed as a
-            // OneRowRelation subquery and can be rewritten by the optimizer 
without
-            // any decorrelation.
-            case Generate(_: PythonUDTF, _, _, _, _, _: OneRowRelation)
-              if 
SQLConf.get.getConf(SQLConf.OPTIMIZE_ONE_ROW_RELATION_SUBQUERY) =>  // Ok
-            case _ =>
-              expr.failAnalysis(
-                errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-                  "NON_DETERMINISTIC_LATERAL_SUBQUERIES",
-                messageParameters = Map("treeNode" -> planToString(plan)))
-          }
-        }
-        // Check if the lateral join's join condition is deterministic.
-        if (join.condition.exists(!_.deterministic)) {
-          join.condition.get.failAnalysis(
-            errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-              "LATERAL_JOIN_CONDITION_NON_DETERMINISTIC",
-            messageParameters = Map("condition" -> join.condition.get.sql))
-        }
-        // Validate to make sure the correlations appearing in the query are 
valid and
-        // allowed by spark.
-        checkCorrelationsInSubquery(expr.plan, isLateral = true)
-
-      case _: FunctionTableSubqueryArgumentExpression =>
-        // Do nothing here, since we will check for this pattern later.
-
-      case inSubqueryOrExistsSubquery =>
-        plan match {
-          case _: Filter | _: SupportsSubquery | _: Join |
-            _: Project | _: Aggregate | _: Window => // Ok
-          case _ =>
-            expr.failAnalysis(
-              errorClass =
-                
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_IN_EXISTS_SUBQUERY",
-              messageParameters = Map("treeNode" -> planToString(plan)))
-        }
-        // Validate to make sure the correlations appearing in the query are 
valid and
-        // allowed by spark.
-        checkCorrelationsInSubquery(expr.plan)
-    }
+    ValidateSubqueryExpression(plan, expr)
   }
 
   /**
@@ -1280,313 +1071,6 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
     check(plan)
   }
 
-  /**
-   * Validates to make sure the outer references appearing inside the subquery
-   * are allowed.
-   */
-  private def checkCorrelationsInSubquery(
-      sub: LogicalPlan,
-      isScalar: Boolean = false,
-      isLateral: Boolean = false): Unit = {
-    // Some query shapes are only supported with the DecorrelateInnerQuery 
framework.
-    // Support for Exists and IN subqueries is subject to a separate config 
flag
-    // 'decorrelateInnerQueryEnabledForExistsIn'.
-    val usingDecorrelateInnerQueryFramework =
-      (SQLConf.get.decorrelateInnerQueryEnabledForExistsIn || isScalar || 
isLateral) &&
-        SQLConf.get.decorrelateInnerQueryEnabled
-
-    // Validate that correlated aggregate expression do not contain a mixture
-    // of outer and local references.
-    def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
-      expr.foreach {
-        case a: AggregateExpression if containsOuter(a) =>
-          if (a.references.nonEmpty) {
-            a.failAnalysis(
-              errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-                "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES",
-              messageParameters = Map("function" -> a.sql))
-          }
-        case _ =>
-      }
-    }
-
-    // Make sure expressions of a plan do not contain outer references.
-    def failOnOuterReferenceInPlan(p: LogicalPlan): Unit = {
-      if (p.expressions.exists(containsOuter)) {
-        p.failAnalysis(
-          errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-            "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
-          messageParameters = Map("treeNode" -> planToString(p)))
-      }
-    }
-
-    // Check whether the logical plan node can host outer references.
-    // A `Project` can host outer references if it is inside a scalar or a 
lateral subquery and
-    // DecorrelateInnerQuery is enabled. Otherwise, only Filter can only outer 
references.
-    def canHostOuter(plan: LogicalPlan): Boolean = plan match {
-      case _: Filter => true
-      case _: Project => usingDecorrelateInnerQueryFramework
-      case _: Join => usingDecorrelateInnerQueryFramework
-      case _ => false
-    }
-
-    // Make sure a plan's expressions do not contain :
-    // 1. Aggregate expressions that have mixture of outer and local 
references.
-    // 2. Expressions containing outer references on plan nodes other than 
allowed operators.
-    def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
-      p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
-      val exprs = stripOuterReferences(p.expressions.filter(expr => 
containsOuter(expr)))
-      if (!canHostOuter(p) && !exprs.isEmpty) {
-        p.failAnalysis(
-          errorClass =
-            "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE",
-          messageParameters = Map("sqlExprs" -> 
exprs.map(toSQLExpr).mkString(",")))
-      }
-    }
-
-    // SPARK-17348: A potential incorrect result case.
-    // When a correlated predicate is a non-equality predicate,
-    // certain operators are not permitted from the operator
-    // hosting the correlated predicate up to the operator on the outer table.
-    // Otherwise, the pull up of the correlated predicate
-    // will generate a plan with a different semantics
-    // which could return incorrect result.
-    // Currently we check for Aggregate and Window operators
-    //
-    // Below shows an example of a Logical Plan during Analyzer phase that
-    // show this problem. Pulling the correlated predicate [outer(c2#77) >= ..]
-    // through the Aggregate (or Window) operator could alter the result of
-    // the Aggregate.
-    //
-    // Project [c1#76]
-    // +- Project [c1#87, c2#88]
-    // :  (Aggregate or Window operator)
-    // :  +- Filter [outer(c2#77) >= c2#88)]
-    // :     +- SubqueryAlias t2, `t2`
-    // :        +- Project [_1#84 AS c1#87, _2#85 AS c2#88]
-    // :           +- LocalRelation [_1#84, _2#85]
-    // +- SubqueryAlias t1, `t1`
-    // +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
-    // +- LocalRelation [_1#73, _2#74]
-    // SPARK-35080: The same issue can happen to correlated equality 
predicates when
-    // they do not guarantee one-to-one mapping between inner and outer 
attributes.
-    // For example:
-    // Table:
-    //   t1(a, b): [(0, 6), (1, 5), (2, 4)]
-    //   t2(c): [(6)]
-    //
-    // Query:
-    //   SELECT c, (SELECT COUNT(*) FROM t1 WHERE a + b = c) FROM t2
-    //
-    // Original subquery plan:
-    //   Aggregate [count(1)]
-    //   +- Filter ((a + b) = outer(c))
-    //      +- LocalRelation [a, b]
-    //
-    // Plan after pulling up correlated predicates:
-    //   Aggregate [a, b] [count(1), a, b]
-    //   +- LocalRelation [a, b]
-    //
-    // Plan after rewrite:
-    //   Project [c1, count(1)]
-    //   +- Join LeftOuter ((a + b) = c)
-    //      :- LocalRelation [c]
-    //      +- Aggregate [a, b] [count(1), a, b]
-    //         +- LocalRelation [a, b]
-    //
-    // The right hand side of the join transformed from the subquery will 
output
-    //   count(1) | a | b
-    //      1     | 0 | 6
-    //      1     | 1 | 5
-    //      1     | 2 | 4
-    // and the plan after rewrite will give the original query incorrect 
results.
-    def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: 
LogicalPlan): Unit = {
-      // Correlated non-equality predicates are only supported with the 
decorrelate
-      // inner query framework. Currently we only use this new framework for 
scalar
-      // and lateral subqueries.
-      val allowNonEqualityPredicates = usingDecorrelateInnerQueryFramework
-      if (!allowNonEqualityPredicates && predicates.nonEmpty) {
-        // Report a non-supported case as an exception
-        p.failAnalysis(
-          errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-            "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
-          messageParameters =
-            Map("treeNode" -> 
s"${exprsToString(predicates)}\n${planToString(p)}"))
-      }
-    }
-
-    // Recursively check invalid outer references in the plan.
-    def checkPlan(
-        plan: LogicalPlan,
-        aggregated: Boolean = false,
-        canContainOuter: Boolean = true): Unit = {
-
-      if (!canContainOuter) {
-        failOnOuterReferenceInPlan(plan)
-      }
-
-      // Approve operators allowed in a correlated subquery
-      // There are 4 categories:
-      // 1. Operators that are allowed anywhere in a correlated subquery, and,
-      //    by definition of the operators, they either do not contain
-      //    any columns or cannot host outer references.
-      // 2. Operators that are allowed anywhere in a correlated subquery
-      //    so long as they do not host outer references.
-      // 3. Operators that need special handling. These operators are
-      //    Filter, Join, Aggregate, and Generate.
-      //
-      // Any operators that are not in the above list are allowed
-      // in a correlated subquery only if they are not on a correlation path.
-      // In other word, these operators are allowed only under a correlation 
point.
-      //
-      // A correlation path is defined as the sub-tree of all the operators 
that
-      // are on the path from the operator hosting the correlated expressions
-      // up to the operator producing the correlated values.
-      plan match {
-        // Category 1:
-        // ResolvedHint, LeafNode, Repartition, and SubqueryAlias
-        case p @ (_: ResolvedHint | _: LeafNode | _: Repartition | _: 
SubqueryAlias) =>
-          p.children.foreach(child => checkPlan(child, aggregated, 
canContainOuter))
-
-        case p @ (_ : Union | _: SetOperation) =>
-          // Set operations (e.g. UNION) containing correlated values are only 
supported
-          // with DecorrelateInnerQuery framework.
-          val childCanContainOuter = (canContainOuter
-            && usingDecorrelateInnerQueryFramework
-            && SQLConf.get.getConf(SQLConf.DECORRELATE_SET_OPS_ENABLED))
-          p.children.foreach(child => checkPlan(child, aggregated, 
childCanContainOuter))
-
-        // Category 2:
-        // These operators can be anywhere in a correlated subquery.
-        // so long as they do not host outer references in the operators.
-        case p: Project =>
-          failOnInvalidOuterReference(p)
-          checkPlan(p.child, aggregated, canContainOuter)
-
-        case s: Sort =>
-          failOnInvalidOuterReference(s)
-          checkPlan(s.child, aggregated, canContainOuter)
-
-        case r: RepartitionByExpression =>
-          failOnInvalidOuterReference(r)
-          checkPlan(r.child, aggregated, canContainOuter)
-
-        case l: LateralJoin =>
-          failOnInvalidOuterReference(l)
-          checkPlan(l.child, aggregated, canContainOuter)
-
-        // Category 3:
-        // Filter is one of the two operators allowed to host correlated 
expressions.
-        // The other operator is Join. Filter can be anywhere in a correlated 
subquery.
-        case f: Filter =>
-          failOnInvalidOuterReference(f)
-          val (correlated, _) = 
splitConjunctivePredicates(f.condition).partition(containsOuter)
-          val unsupportedPredicates = 
correlated.filterNot(DecorrelateInnerQuery.canPullUpOverAgg)
-          if (aggregated) {
-            failOnUnsupportedCorrelatedPredicate(unsupportedPredicates, f)
-          }
-          checkPlan(f.child, aggregated, canContainOuter)
-
-        // Aggregate cannot host any correlated expressions
-        // It can be on a correlation path if the correlation contains
-        // only supported correlated equality predicates.
-        // It cannot be on a correlation path if the correlation has
-        // non-equality correlated predicates.
-        case a: Aggregate =>
-          failOnInvalidOuterReference(a)
-          checkPlan(a.child, aggregated = true, canContainOuter)
-
-        // Same as Aggregate above.
-        case w: Window =>
-          failOnInvalidOuterReference(w)
-          checkPlan(w.child, aggregated = true, canContainOuter)
-
-        // Distinct does not host any correlated expressions, but during the 
optimization phase
-        // it will be rewritten as Aggregate, which can only be on a 
correlation path if the
-        // correlation contains only the supported correlated equality 
predicates.
-        // Only block it for lateral subqueries because scalar subqueries must 
be aggregated
-        // and it does not impact the results for IN/EXISTS subqueries.
-        case d: Distinct =>
-          checkPlan(d.child, aggregated = isLateral, canContainOuter)
-
-        // Join can host correlated expressions.
-        case j @ Join(left, right, joinType, _, _) =>
-          failOnInvalidOuterReference(j)
-          joinType match {
-            // Inner join, like Filter, can be anywhere.
-            case _: InnerLike =>
-              j.children.foreach(child => checkPlan(child, aggregated, 
canContainOuter))
-
-            // Left outer join's right operand cannot be on a correlation path.
-            // LeftAnti and ExistenceJoin are special cases of LeftOuter.
-            // Note that ExistenceJoin cannot be expressed externally in both 
SQL and DataFrame
-            // so it should not show up here in Analysis phase. This is just a 
safety net.
-            //
-            // LeftSemi does not allow output from the right operand.
-            // Any correlated references in the subplan
-            // of the right operand cannot be pulled up.
-            case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
-              checkPlan(left, aggregated, canContainOuter)
-              checkPlan(right, aggregated, canContainOuter = false)
-
-            // Likewise, Right outer join's left operand cannot be on a 
correlation path.
-            case RightOuter =>
-              checkPlan(left, aggregated, canContainOuter = false)
-              checkPlan(right, aggregated, canContainOuter)
-
-            // Any other join types not explicitly listed above,
-            // including Full outer join, are treated as Category 4.
-            case _ =>
-              j.children.foreach(child => checkPlan(child, aggregated, 
canContainOuter = false))
-          }
-
-        // Generator with join=true, i.e., expressed with
-        // LATERAL VIEW [OUTER], similar to inner join,
-        // allows to have correlation under it
-        // but must not host any outer references.
-        // Note:
-        // Generator with requiredChildOutput.isEmpty is treated as Category 4.
-        case g: Generate if g.requiredChildOutput.nonEmpty =>
-          failOnInvalidOuterReference(g)
-          checkPlan(g.child, aggregated, canContainOuter)
-
-        // Correlated subquery can have a LIMIT clause
-        case l @ Limit(_, input) =>
-          failOnInvalidOuterReference(l)
-          checkPlan(
-            input,
-            aggregated,
-            canContainOuter && 
SQLConf.get.getConf(SQLConf.DECORRELATE_LIMIT_ENABLED))
-
-        case o @ Offset(_, input) =>
-          failOnInvalidOuterReference(o)
-          checkPlan(
-            input,
-            aggregated,
-            canContainOuter && 
SQLConf.get.getConf(SQLConf.DECORRELATE_OFFSET_ENABLED))
-
-        // We always inline CTE relations before analysis check, and only 
un-referenced CTE
-        // relations will be kept in the plan. Here we should simply skip them 
and check the
-        // children, as un-referenced CTE relations won't be executed anyway 
and doesn't need to
-        // be restricted by the current subquery correlation limitations.
-        case _: WithCTE | _: CTERelationDef =>
-          plan.children.foreach(p => checkPlan(p, aggregated, canContainOuter))
-
-        // Category 4: Any other operators not in the above 3 categories
-        // cannot be on a correlation path, that is they are allowed only
-        // under a correlation point but they and their descendant operators
-        // are not allowed to have any correlated expressions.
-        case p =>
-          p.children.foreach(p => checkPlan(p, aggregated, canContainOuter = 
false))
-      }
-    }
-
-    // Simplify the predicates before validating any unsupported correlation 
patterns in the plan.
-    AnalysisHelper.allowInvokingTransformsInAnalyzer {
-      checkPlan(BooleanSimplification(sub))
-    }
-  }
-
   /**
    * Validates the options used for alter table commands after table and 
columns are resolved.
    */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PlanToString.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PlanToString.scala
new file mode 100644
index 000000000000..4f39c6cd796a
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PlanToString.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.util.Utils
+
+/**
+ * A plan-printing utility for [[CheckAnalysis]].
+ */
+trait PlanToString {
+  protected def planToString(plan: LogicalPlan): String = {
+    if (Utils.isTesting) scrubOutIds(plan.toString) else plan.toString
+  }
+
+  protected def scrubOutIds(string: String): String =
+    string
+      .replaceAll("#\\d+", "#x")
+      .replaceAll("operator id = \\d+", "operator id = #x")
+      .replaceAll("rand\\(-?\\d+\\)", "rand(number)")
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ValidateSubqueryExpression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ValidateSubqueryExpression.scala
new file mode 100644
index 000000000000..cc863d4c03ef
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ValidateSubqueryExpression.scala
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, 
DecorrelateInnerQuery}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+object ValidateSubqueryExpression
+    extends PredicateHelper with QueryErrorsBase with PlanToString with 
Logging {
+
+  /**
+   * Validates subquery expressions in the plan. Upon failure, returns an user 
facing error.
+   */
+  def apply(plan: LogicalPlan, expr: SubqueryExpression): Unit = {
+    def checkAggregateInScalarSubquery(
+        conditions: Seq[Expression],
+        query: LogicalPlan, agg: Aggregate): Unit = {
+      // Make sure correlated scalar subqueries contain one row for every 
outer row by
+      // enforcing that they are aggregates containing exactly one aggregate 
expression.
+      val aggregates = agg.expressions.flatMap(_.collect {
+        case a: AggregateExpression => a
+      })
+      if (aggregates.isEmpty) {
+        expr.failAnalysis(
+          errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+            "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
+          messageParameters = Map.empty)
+      }
+
+      val nonEquivalentGroupByExprs = nonEquivalentGroupbyCols(query, agg)
+      val invalidCols = if (!SQLConf.get.getConf(
+        
SQLConf.LEGACY_SCALAR_SUBQUERY_ALLOW_GROUP_BY_NON_EQUALITY_CORRELATED_PREDICATE))
 {
+        nonEquivalentGroupByExprs
+      } else {
+        // Legacy incorrect logic for checking for invalid group-by columns 
(see SPARK-48503).
+        // Allows any inner attribute that appears in a correlated predicate, 
even if it is a
+        // non-equality predicate or under an operator that can change the 
values of the attribute
+        // (see comments on getCorrelatedEquivalentInnerColumns for examples).
+        // Note: groupByCols does not contain outer refs - grouping by an 
outer ref is always ok
+        val groupByCols = 
AttributeSet(agg.groupingExpressions.flatMap(_.references))
+        val subqueryColumns = 
getCorrelatedPredicates(query).flatMap(_.references)
+          .filterNot(conditions.flatMap(_.references).contains)
+        val correlatedCols = AttributeSet(subqueryColumns)
+        val invalidColsLegacy = groupByCols -- correlatedCols
+        if (!nonEquivalentGroupByExprs.isEmpty && invalidColsLegacy.isEmpty) {
+          logWarning(log"Using legacy behavior for " +
+            log"${MDC(LogKeys.CONFIG, SQLConf
+            
.LEGACY_SCALAR_SUBQUERY_ALLOW_GROUP_BY_NON_EQUALITY_CORRELATED_PREDICATE.key)}. 
" +
+            log"Query would be rejected with non-legacy behavior but is 
allowed by " +
+            log"legacy behavior. Query may be invalid and return wrong results 
if the scalar " +
+            log"subquery's group-by outputs multiple rows.")
+        }
+        invalidColsLegacy
+      }
+
+      if (invalidCols.nonEmpty) {
+        val names = invalidCols.map { el =>
+          el match {
+            case attr: Attribute => attr.name
+            case expr: Expression => expr.toString
+          }
+        }
+        expr.failAnalysis(
+          errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+            "NON_CORRELATED_COLUMNS_IN_GROUP_BY",
+          messageParameters = Map("value" -> names.mkString(",")))
+      }
+    }
+
+    // Skip subquery aliases added by the Analyzer as well as hints.
+    // For projects, do the necessary mapping and skip to its child.
+    @scala.annotation.tailrec
+    def cleanQueryInScalarSubquery(p: LogicalPlan): LogicalPlan = p match {
+      case s: SubqueryAlias => cleanQueryInScalarSubquery(s.child)
+      // Skip SQL function node added by the Analyzer
+      case s: SQLFunctionNode => cleanQueryInScalarSubquery(s.child)
+      case p: Project => cleanQueryInScalarSubquery(p.child)
+      case h: ResolvedHint => cleanQueryInScalarSubquery(h.child)
+      case child => child
+    }
+
+    // Check whether the given expressions contains the subquery expression.
+    def containsExpr(expressions: Seq[Expression]): Boolean = {
+      expressions.exists(_.exists(_.semanticEquals(expr)))
+    }
+
+    def checkOuterReference(p: LogicalPlan, expr: SubqueryExpression): Unit = 
p match {
+      case f: Filter =>
+        if (hasOuterReferences(expr.plan)) {
+          expr.plan.expressions.foreach(_.foreachUp {
+            case o: OuterReference =>
+              p.children.foreach(e =>
+                if (!e.output.exists(_.exprId == o.exprId)) {
+                  o.failAnalysis(
+                    errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+                      "CORRELATED_COLUMN_NOT_FOUND",
+                    messageParameters = Map("value" -> o.name))
+                })
+            case _ =>
+          })
+        }
+      case _ =>
+    }
+
+    // Check if there is outer attribute that cannot be found from the plan.
+    checkOuterReference(plan, expr)
+
+    expr match {
+      case ScalarSubquery(query, outerAttrs, _, _, _, _, _) =>
+        // Scalar subquery must return one column as output.
+        if (query.output.size != 1) {
+          throw 
QueryCompilationErrors.subqueryReturnMoreThanOneColumn(query.output.size,
+            expr.origin)
+        }
+
+        if (outerAttrs.nonEmpty) {
+          if (!SQLConf.get.getConf(SQLConf.SCALAR_SUBQUERY_USE_SINGLE_JOIN)) {
+            cleanQueryInScalarSubquery(query) match {
+              case a: Aggregate => checkAggregateInScalarSubquery(outerAttrs, 
query, a)
+              case Filter(_, a: Aggregate) => 
checkAggregateInScalarSubquery(outerAttrs, query, a)
+              case p: LogicalPlan if p.maxRows.exists(_ <= 1) => // Ok
+              case other =>
+                expr.failAnalysis(
+                  errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+                    "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
+                  messageParameters = Map.empty)
+            }
+          }
+
+          // Only certain operators are allowed to host subquery expression 
containing
+          // outer references.
+          plan match {
+            case _: Filter | _: Project | _: SupportsSubquery => // Ok
+            case a: Aggregate =>
+              // If the correlated scalar subquery is in the grouping 
expressions of an Aggregate,
+              // it must also be in the aggregate expressions to be rewritten 
in the optimization
+              // phase.
+              if (containsExpr(a.groupingExpressions) && 
!containsExpr(a.aggregateExpressions)) {
+                a.failAnalysis(
+                  errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+                    "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
+                  messageParameters = Map.empty)
+              }
+            case other =>
+              other.failAnalysis(
+                errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+                  "UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY",
+                messageParameters = Map("treeNode" -> planToString(other)))
+          }
+        }
+        // Validate to make sure the correlations appearing in the query are 
valid and
+        // allowed by spark.
+        checkCorrelationsInSubquery(expr.plan, isScalar = true)
+
+      case _: LateralSubquery =>
+        assert(plan.isInstanceOf[LateralJoin])
+        val join = plan.asInstanceOf[LateralJoin]
+        // A lateral join with a multi-row outer query and a non-deterministic 
lateral subquery
+        // cannot be decorrelated. Otherwise it may produce incorrect results.
+        if (!expr.deterministic && !join.left.maxRows.exists(_ <= 1)) {
+          cleanQueryInScalarSubquery(join.right.plan) match {
+            // Python UDTFs are by default non-deterministic. They are 
constructed as a
+            // OneRowRelation subquery and can be rewritten by the optimizer 
without
+            // any decorrelation.
+            case Generate(_: PythonUDTF, _, _, _, _, _: OneRowRelation)
+              if 
SQLConf.get.getConf(SQLConf.OPTIMIZE_ONE_ROW_RELATION_SUBQUERY) =>  // Ok
+            case _ =>
+              expr.failAnalysis(
+                errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+                  "NON_DETERMINISTIC_LATERAL_SUBQUERIES",
+                messageParameters = Map("treeNode" -> planToString(plan)))
+          }
+        }
+        // Check if the lateral join's join condition is deterministic.
+        if (join.condition.exists(!_.deterministic)) {
+          join.condition.get.failAnalysis(
+            errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+              "LATERAL_JOIN_CONDITION_NON_DETERMINISTIC",
+            messageParameters = Map("condition" -> join.condition.get.sql))
+        }
+        // Validate to make sure the correlations appearing in the query are 
valid and
+        // allowed by spark.
+        checkCorrelationsInSubquery(expr.plan, isLateral = true)
+
+      case _: FunctionTableSubqueryArgumentExpression =>
+        // Do nothing here, since we will check for this pattern later.
+
+      case inSubqueryOrExistsSubquery =>
+        plan match {
+          case _: Filter | _: SupportsSubquery | _: Join |
+            _: Project | _: Aggregate | _: Window => // Ok
+          case _ =>
+            expr.failAnalysis(
+              errorClass =
+                
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_IN_EXISTS_SUBQUERY",
+              messageParameters = Map("treeNode" -> planToString(plan)))
+        }
+        // Validate to make sure the correlations appearing in the query are 
valid and
+        // allowed by spark.
+        checkCorrelationsInSubquery(expr.plan)
+    }
+  }
+
+  /**
+   * Validates to make sure the outer references appearing inside the subquery
+   * are allowed.
+   */
+  private def checkCorrelationsInSubquery(
+      sub: LogicalPlan,
+      isScalar: Boolean = false,
+      isLateral: Boolean = false): Unit = {
+    // Some query shapes are only supported with the DecorrelateInnerQuery 
framework.
+    // Support for Exists and IN subqueries is subject to a separate config 
flag
+    // 'decorrelateInnerQueryEnabledForExistsIn'.
+    val usingDecorrelateInnerQueryFramework =
+      (SQLConf.get.decorrelateInnerQueryEnabledForExistsIn || isScalar || 
isLateral) &&
+        SQLConf.get.decorrelateInnerQueryEnabled
+
+    // Validate that correlated aggregate expression do not contain a mixture
+    // of outer and local references.
+    def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
+      expr.foreach {
+        case a: AggregateExpression if containsOuter(a) =>
+          if (a.references.nonEmpty) {
+            a.failAnalysis(
+              errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+                "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES",
+              messageParameters = Map("function" -> a.sql))
+          }
+        case _ =>
+      }
+    }
+
+    // Make sure expressions of a plan do not contain outer references.
+    def failOnOuterReferenceInPlan(p: LogicalPlan): Unit = {
+      if (p.expressions.exists(containsOuter)) {
+        p.failAnalysis(
+          errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+            "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+          messageParameters = Map("treeNode" -> planToString(p)))
+      }
+    }
+
+    // Check whether the logical plan node can host outer references.
+    // A `Project` can host outer references if it is inside a scalar or a 
lateral subquery and
+    // DecorrelateInnerQuery is enabled. Otherwise, only Filter can only outer 
references.
+    def canHostOuter(plan: LogicalPlan): Boolean = plan match {
+      case _: Filter => true
+      case _: Project => usingDecorrelateInnerQueryFramework
+      case _: Join => usingDecorrelateInnerQueryFramework
+      case _ => false
+    }
+
+    // Make sure a plan's expressions do not contain :
+    // 1. Aggregate expressions that have mixture of outer and local 
references.
+    // 2. Expressions containing outer references on plan nodes other than 
allowed operators.
+    def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
+      p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
+      val exprs = stripOuterReferences(p.expressions.filter(expr => 
containsOuter(expr)))
+      if (!canHostOuter(p) && !exprs.isEmpty) {
+        p.failAnalysis(
+          errorClass =
+            "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE",
+          messageParameters = Map("sqlExprs" -> 
exprs.map(toSQLExpr).mkString(",")))
+      }
+    }
+
+    // SPARK-17348: A potential incorrect result case.
+    // When a correlated predicate is a non-equality predicate,
+    // certain operators are not permitted from the operator
+    // hosting the correlated predicate up to the operator on the outer table.
+    // Otherwise, the pull up of the correlated predicate
+    // will generate a plan with a different semantics
+    // which could return incorrect result.
+    // Currently we check for Aggregate and Window operators
+    //
+    // Below shows an example of a Logical Plan during Analyzer phase that
+    // show this problem. Pulling the correlated predicate [outer(c2#77) >= ..]
+    // through the Aggregate (or Window) operator could alter the result of
+    // the Aggregate.
+    //
+    // Project [c1#76]
+    // +- Project [c1#87, c2#88]
+    // :  (Aggregate or Window operator)
+    // :  +- Filter [outer(c2#77) >= c2#88)]
+    // :     +- SubqueryAlias t2, `t2`
+    // :        +- Project [_1#84 AS c1#87, _2#85 AS c2#88]
+    // :           +- LocalRelation [_1#84, _2#85]
+    // +- SubqueryAlias t1, `t1`
+    // +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
+    // +- LocalRelation [_1#73, _2#74]
+    // SPARK-35080: The same issue can happen to correlated equality 
predicates when
+    // they do not guarantee one-to-one mapping between inner and outer 
attributes.
+    // For example:
+    // Table:
+    //   t1(a, b): [(0, 6), (1, 5), (2, 4)]
+    //   t2(c): [(6)]
+    //
+    // Query:
+    //   SELECT c, (SELECT COUNT(*) FROM t1 WHERE a + b = c) FROM t2
+    //
+    // Original subquery plan:
+    //   Aggregate [count(1)]
+    //   +- Filter ((a + b) = outer(c))
+    //      +- LocalRelation [a, b]
+    //
+    // Plan after pulling up correlated predicates:
+    //   Aggregate [a, b] [count(1), a, b]
+    //   +- LocalRelation [a, b]
+    //
+    // Plan after rewrite:
+    //   Project [c1, count(1)]
+    //   +- Join LeftOuter ((a + b) = c)
+    //      :- LocalRelation [c]
+    //      +- Aggregate [a, b] [count(1), a, b]
+    //         +- LocalRelation [a, b]
+    //
+    // The right hand side of the join transformed from the subquery will 
output
+    //   count(1) | a | b
+    //      1     | 0 | 6
+    //      1     | 1 | 5
+    //      1     | 2 | 4
+    // and the plan after rewrite will give the original query incorrect 
results.
+    def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: 
LogicalPlan): Unit = {
+      // Correlated non-equality predicates are only supported with the 
decorrelate
+      // inner query framework. Currently we only use this new framework for 
scalar
+      // and lateral subqueries.
+      val allowNonEqualityPredicates = usingDecorrelateInnerQueryFramework
+      if (!allowNonEqualityPredicates && predicates.nonEmpty) {
+        // Report a non-supported case as an exception
+        p.failAnalysis(
+          errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
+            "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
+          messageParameters =
+            Map("treeNode" -> 
s"${exprsToString(predicates)}\n${planToString(p)}"))
+      }
+    }
+
+    // Recursively check invalid outer references in the plan.
+    def checkPlan(
+        plan: LogicalPlan,
+        aggregated: Boolean = false,
+        canContainOuter: Boolean = true): Unit = {
+
+      if (!canContainOuter) {
+        failOnOuterReferenceInPlan(plan)
+      }
+
+      // Approve operators allowed in a correlated subquery
+      // There are 4 categories:
+      // 1. Operators that are allowed anywhere in a correlated subquery, and,
+      //    by definition of the operators, they either do not contain
+      //    any columns or cannot host outer references.
+      // 2. Operators that are allowed anywhere in a correlated subquery
+      //    so long as they do not host outer references.
+      // 3. Operators that need special handling. These operators are
+      //    Filter, Join, Aggregate, and Generate.
+      //
+      // Any operators that are not in the above list are allowed
+      // in a correlated subquery only if they are not on a correlation path.
+      // In other word, these operators are allowed only under a correlation 
point.
+      //
+      // A correlation path is defined as the sub-tree of all the operators 
that
+      // are on the path from the operator hosting the correlated expressions
+      // up to the operator producing the correlated values.
+      plan match {
+        // Category 1:
+        // ResolvedHint, LeafNode, Repartition, and SubqueryAlias
+        case p @ (_: ResolvedHint | _: LeafNode | _: Repartition | _: 
SubqueryAlias) =>
+          p.children.foreach(child => checkPlan(child, aggregated, 
canContainOuter))
+
+        case p @ (_ : Union | _: SetOperation) =>
+          // Set operations (e.g. UNION) containing correlated values are only 
supported
+          // with DecorrelateInnerQuery framework.
+          val childCanContainOuter = (canContainOuter
+            && usingDecorrelateInnerQueryFramework
+            && SQLConf.get.getConf(SQLConf.DECORRELATE_SET_OPS_ENABLED))
+          p.children.foreach(child => checkPlan(child, aggregated, 
childCanContainOuter))
+
+        // Category 2:
+        // These operators can be anywhere in a correlated subquery.
+        // so long as they do not host outer references in the operators.
+        case p: Project =>
+          failOnInvalidOuterReference(p)
+          checkPlan(p.child, aggregated, canContainOuter)
+
+        case s: Sort =>
+          failOnInvalidOuterReference(s)
+          checkPlan(s.child, aggregated, canContainOuter)
+
+        case r: RepartitionByExpression =>
+          failOnInvalidOuterReference(r)
+          checkPlan(r.child, aggregated, canContainOuter)
+
+        case l: LateralJoin =>
+          failOnInvalidOuterReference(l)
+          checkPlan(l.child, aggregated, canContainOuter)
+
+        // Category 3:
+        // Filter is one of the two operators allowed to host correlated 
expressions.
+        // The other operator is Join. Filter can be anywhere in a correlated 
subquery.
+        case f: Filter =>
+          failOnInvalidOuterReference(f)
+          val (correlated, _) = 
splitConjunctivePredicates(f.condition).partition(containsOuter)
+          val unsupportedPredicates = 
correlated.filterNot(DecorrelateInnerQuery.canPullUpOverAgg)
+          if (aggregated) {
+            failOnUnsupportedCorrelatedPredicate(unsupportedPredicates, f)
+          }
+          checkPlan(f.child, aggregated, canContainOuter)
+
+        // Aggregate cannot host any correlated expressions
+        // It can be on a correlation path if the correlation contains
+        // only supported correlated equality predicates.
+        // It cannot be on a correlation path if the correlation has
+        // non-equality correlated predicates.
+        case a: Aggregate =>
+          failOnInvalidOuterReference(a)
+          checkPlan(a.child, aggregated = true, canContainOuter)
+
+        // Same as Aggregate above.
+        case w: Window =>
+          failOnInvalidOuterReference(w)
+          checkPlan(w.child, aggregated = true, canContainOuter)
+
+        // Distinct does not host any correlated expressions, but during the 
optimization phase
+        // it will be rewritten as Aggregate, which can only be on a 
correlation path if the
+        // correlation contains only the supported correlated equality 
predicates.
+        // Only block it for lateral subqueries because scalar subqueries must 
be aggregated
+        // and it does not impact the results for IN/EXISTS subqueries.
+        case d: Distinct =>
+          checkPlan(d.child, aggregated = isLateral, canContainOuter)
+
+        // Join can host correlated expressions.
+        case j @ Join(left, right, joinType, _, _) =>
+          failOnInvalidOuterReference(j)
+          joinType match {
+            // Inner join, like Filter, can be anywhere.
+            case _: InnerLike =>
+              j.children.foreach(child => checkPlan(child, aggregated, 
canContainOuter))
+
+            // Left outer join's right operand cannot be on a correlation path.
+            // LeftAnti and ExistenceJoin are special cases of LeftOuter.
+            // Note that ExistenceJoin cannot be expressed externally in both 
SQL and DataFrame
+            // so it should not show up here in Analysis phase. This is just a 
safety net.
+            //
+            // LeftSemi does not allow output from the right operand.
+            // Any correlated references in the subplan
+            // of the right operand cannot be pulled up.
+            case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
+              checkPlan(left, aggregated, canContainOuter)
+              checkPlan(right, aggregated, canContainOuter = false)
+
+            // Likewise, Right outer join's left operand cannot be on a 
correlation path.
+            case RightOuter =>
+              checkPlan(left, aggregated, canContainOuter = false)
+              checkPlan(right, aggregated, canContainOuter)
+
+            // Any other join types not explicitly listed above,
+            // including Full outer join, are treated as Category 4.
+            case _ =>
+              j.children.foreach(child => checkPlan(child, aggregated, 
canContainOuter = false))
+          }
+
+        // Generator with join=true, i.e., expressed with
+        // LATERAL VIEW [OUTER], similar to inner join,
+        // allows to have correlation under it
+        // but must not host any outer references.
+        // Note:
+        // Generator with requiredChildOutput.isEmpty is treated as Category 4.
+        case g: Generate if g.requiredChildOutput.nonEmpty =>
+          failOnInvalidOuterReference(g)
+          checkPlan(g.child, aggregated, canContainOuter)
+
+        // Correlated subquery can have a LIMIT clause
+        case l @ Limit(_, input) =>
+          failOnInvalidOuterReference(l)
+          checkPlan(
+            input,
+            aggregated,
+            canContainOuter && 
SQLConf.get.getConf(SQLConf.DECORRELATE_LIMIT_ENABLED))
+
+        case o @ Offset(_, input) =>
+          failOnInvalidOuterReference(o)
+          checkPlan(
+            input,
+            aggregated,
+            canContainOuter && 
SQLConf.get.getConf(SQLConf.DECORRELATE_OFFSET_ENABLED))
+
+        // We always inline CTE relations before analysis check, and only 
un-referenced CTE
+        // relations will be kept in the plan. Here we should simply skip them 
and check the
+        // children, as un-referenced CTE relations won't be executed anyway 
and doesn't need to
+        // be restricted by the current subquery correlation limitations.
+        case _: WithCTE | _: CTERelationDef =>
+          plan.children.foreach(p => checkPlan(p, aggregated, canContainOuter))
+
+        // Category 4: Any other operators not in the above 3 categories
+        // cannot be on a correlation path, that is they are allowed only
+        // under a correlation point but they and their descendant operators
+        // are not allowed to have any correlated expressions.
+        case p =>
+          p.children.foreach(p => checkPlan(p, aggregated, canContainOuter = 
false))
+      }
+    }
+
+    // Simplify the predicates before validating any unsupported correlation 
patterns in the plan.
+    AnalysisHelper.allowInvokingTransformsInAnalyzer {
+      checkPlan(BooleanSimplification(sub))
+    }
+  }
+
+  private def exprsToString(exprs: Seq[Expression]): String = {
+    val result = exprs.map(_.toString).mkString("\n")
+    if (Utils.isTesting) scrubOutIds(result) else result
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to