This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d57136cf3cf5 [SPARK-54673][SQL] Refactor SQL pipe syntax analysis code
for sharing and reuse
d57136cf3cf5 is described below
commit d57136cf3cf5725e17e6202cc8a91a5cdda170cc
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Thu Dec 11 11:27:41 2025 -0800
[SPARK-54673][SQL] Refactor SQL pipe syntax analysis code for sharing and
reuse
### What changes were proposed in this pull request?
This PR adds a small refactor of the SQL pipe syntax analysis code for
sharing and reuse.
### Why are the changes needed?
The new analyzer can refer to the refactored code without duplicating it.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
N/A, this is a small refactoring only.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53431 from dtenedor/refactor-pipe-syntax.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../sql/catalyst/expressions/pipeOperators.scala | 52 +++++++++++++++-------
1 file changed, 35 insertions(+), 17 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
index b2bb949c9e5e..cfbd403d66fd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
@@ -71,26 +71,44 @@ case object ValidateAndStripPipeExpressions extends
Rule[LogicalPlan] {
case node: LogicalPlan =>
node.resolveExpressions {
case p: PipeExpression if p.child.resolved =>
- // Once the child expression is resolved, we can perform the
necessary invariant checks
- // and then remove this expression, replacing it with the child
expression instead.
- val firstAggregateFunction: Option[AggregateFunction] =
findFirstAggregate(p.child)
- if (p.isAggregate && firstAggregateFunction.isEmpty) {
- throw QueryCompilationErrors
-
.pipeOperatorAggregateExpressionContainsNoAggregateFunction(p.child)
- } else if (!p.isAggregate) {
- // For non-aggregate clauses, only allow aggregate functions in
SELECT.
- // All other clauses (EXTEND, SET, etc.) disallow aggregates.
- val aggregateAllowed = p.clause == PipeOperators.selectClause
- if (!aggregateAllowed) {
- firstAggregateFunction.foreach { a =>
- throw
QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, p.clause)
- }
- }
- }
- p.child
+ validateAndStripPipeExpression(p, p.child)
}
}
+ /**
+ * Validates aggregate function constraints for a [[PipeExpression]] and
returns the resolved
+ * child expression (stripping the [[PipeExpression]] wrapper).
+ *
+ * This method is shared between the fixed-point analyzer rule and the
single-pass resolver.
+ *
+ * @param pipeExpression The [[PipeExpression]] containing metadata about
the pipe clause.
+ * @param resolvedChild The resolved child expression to validate and return.
+ * @return The resolved child expression after validation.
+ */
+ def validateAndStripPipeExpression(
+ pipeExpression: PipeExpression,
+ resolvedChild: Expression): Expression = {
+ val firstAggregateFunction: Option[AggregateFunction] =
findFirstAggregate(resolvedChild)
+ if (pipeExpression.isAggregate && firstAggregateFunction.isEmpty) {
+ throw QueryCompilationErrors
+
.pipeOperatorAggregateExpressionContainsNoAggregateFunction(resolvedChild)
+ }
+ if (!pipeExpression.isAggregate) {
+ // For non-aggregate clauses, only allow aggregate functions in SELECT.
+ // All other clauses (EXTEND, SET, etc.) disallow aggregates.
+ val aggregateAllowed = pipeExpression.clause ==
PipeOperators.selectClause
+ if (!aggregateAllowed) {
+ firstAggregateFunction.foreach { a =>
+ throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(
+ a,
+ pipeExpression.clause
+ )
+ }
+ }
+ }
+ resolvedChild
+ }
+
/** Returns the first aggregate function in the given expression, or None if
not found. */
private def findFirstAggregate(e: Expression): Option[AggregateFunction] = e
match {
case a: AggregateFunction =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]