This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d8a6075dd617 [SPARK-50380][SQL] ReorderAssociativeOperator should
respect the contract in ConstantFolding
d8a6075dd617 is described below
commit d8a6075dd61748c88733c4964ba37ed2430dc671
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Nov 21 14:30:19 2024 -0800
[SPARK-50380][SQL] ReorderAssociativeOperator should respect the contract
in ConstantFolding
### What changes were proposed in this pull request?
This PR fixes a long-standing issue in `ReorderAssociativeOperator`. In
this rule, we flatten the Add/Multiply nodes, and combine the foldable operands
into a single Add/Multiply, then evaluate it into a literal. This is fine
normally, but we added a new contract in `ConstantFolding` with
https://github.com/apache/spark/pull/36468 , due to the introduction of ANSI
mode and we don't want to fail eagerly for expressions within conditional
branches. `ReorderAssociativeOperator` does not [...]
The solution in this PR is to leave the expression evaluation to
`ConstantFolding`. `ReorderAssociativeOperator` should only match literals.
This makes sure that the early expression evaluation follows all the contracts
in `ConstantFolding`.
### Why are the changes needed?
Avoid failing the query which should not fail. This also fixes a regression
caused by https://github.com/apache/spark/pull/48395 , which does not introduce
the bug, but makes the bug more likely to happen.
### Does this PR introduce _any_ user-facing change?
Yes, failed queries can run now.
### How was this patch tested?
new test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #48918 from cloud-fan/error.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/catalyst/optimizer/expressions.scala | 36 ++++++++++++----------
.../ReorderAssociativeOperatorSuite.scala | 20 ++++++++++--
2 files changed, 37 insertions(+), 19 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 3eb7eb6e6b2e..754fea85ec6d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -249,6 +249,11 @@ object ReorderAssociativeOperator extends
Rule[LogicalPlan] {
case _ => ExpressionSet(Seq.empty)
}
+ private def isSameInteger(expr: Expression, value: Int): Boolean = expr
match {
+ case l: Literal => l.value == value
+ case _ => false
+ }
+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
_.containsPattern(BINARY_ARITHMETIC), ruleId) {
case q: LogicalPlan =>
@@ -259,32 +264,31 @@ object ReorderAssociativeOperator extends
Rule[LogicalPlan] {
val groupingExpressionSet = collectGroupingExpressions(q)
q.transformExpressionsDownWithPruning(_.containsPattern(BINARY_ARITHMETIC)) {
case a @ Add(_, _, f) if a.deterministic &&
a.dataType.isInstanceOf[IntegralType] =>
- val (foldables, others) = flattenAdd(a,
groupingExpressionSet).partition(_.foldable)
- if (foldables.nonEmpty) {
- val foldableExpr = foldables.reduce((x, y) => Add(x, y, f))
- val foldableValue = foldableExpr.eval(EmptyRow)
+ val (literals, others) = flattenAdd(a, groupingExpressionSet)
+ .partition(_.isInstanceOf[Literal])
+ if (literals.nonEmpty) {
+ val literalExpr = literals.reduce((x, y) => Add(x, y, f))
if (others.isEmpty) {
- Literal.create(foldableValue, a.dataType)
- } else if (foldableValue == 0) {
+ literalExpr
+ } else if (isSameInteger(literalExpr, 0)) {
others.reduce((x, y) => Add(x, y, f))
} else {
- Add(others.reduce((x, y) => Add(x, y, f)),
Literal.create(foldableValue, a.dataType), f)
+ Add(others.reduce((x, y) => Add(x, y, f)), literalExpr, f)
}
} else {
a
}
case m @ Multiply(_, _, f) if m.deterministic &&
m.dataType.isInstanceOf[IntegralType] =>
- val (foldables, others) = flattenMultiply(m,
groupingExpressionSet).partition(_.foldable)
- if (foldables.nonEmpty) {
- val foldableExpr = foldables.reduce((x, y) => Multiply(x, y, f))
- val foldableValue = foldableExpr.eval(EmptyRow)
- if (others.isEmpty || (foldableValue == 0 && !m.nullable)) {
- Literal.create(foldableValue, m.dataType)
- } else if (foldableValue == 1) {
+ val (literals, others) = flattenMultiply(m, groupingExpressionSet)
+ .partition(_.isInstanceOf[Literal])
+ if (literals.nonEmpty) {
+ val literalExpr = literals.reduce((x, y) => Multiply(x, y, f))
+ if (others.isEmpty || (isSameInteger(literalExpr, 0) &&
!m.nullable)) {
+ literalExpr
+ } else if (isSameInteger(literalExpr, 1)) {
others.reduce((x, y) => Multiply(x, y, f))
} else {
- Multiply(others.reduce((x, y) => Multiply(x, y, f)),
- Literal.create(foldableValue, m.dataType), f)
+ Multiply(others.reduce((x, y) => Multiply(x, y, f)), literalExpr,
f)
}
} else {
m
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala
index 9090e0c7fc10..7733e58547fe 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala
@@ -29,7 +29,8 @@ class ReorderAssociativeOperatorSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
- Batch("ReorderAssociativeOperator", Once,
+ Batch("ReorderAssociativeOperator", FixedPoint(10),
+ ConstantFolding,
ReorderAssociativeOperator) :: Nil
}
@@ -44,7 +45,7 @@ class ReorderAssociativeOperatorSuite extends PlanTest {
($"b" + 1) * 2 * 3 * 4,
$"a" + 1 + $"b" + 2 + $"c" + 3,
$"a" + 1 + $"b" * 2 + $"c" + 3,
- Rand(0) * 1 * 2 * 3 * 4)
+ Rand(0) * 1.0 * 2.0 * 3.0 * 4.0)
val optimized = Optimize.execute(originalQuery.analyze)
@@ -56,7 +57,7 @@ class ReorderAssociativeOperatorSuite extends PlanTest {
(($"b" + 1) * 24).as("((((b + 1) * 2) * 3) * 4)"),
($"a" + $"b" + $"c" + 6).as("(((((a + 1) + b) + 2) + c) + 3)"),
($"a" + $"b" * 2 + $"c" + 4).as("((((a + 1) + (b * 2)) + c) + 3)"),
- Rand(0) * 1 * 2 * 3 * 4)
+ Rand(0) * 1.0 * 2.0 * 3.0 * 4.0)
.analyze
comparePlans(optimized, correctAnswer)
@@ -106,4 +107,17 @@ class ReorderAssociativeOperatorSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+
+ test("SPARK-50380: conditional branches with error expression") {
+ val originalQuery1 = testRelation.select(If($"a" === 1, 1L,
Literal(1).div(0) + $"b")).analyze
+ val optimized1 = Optimize.execute(originalQuery1)
+ comparePlans(optimized1, originalQuery1)
+
+ val originalQuery2 = testRelation.select(
+ If($"a" === 1, 1, ($"b" + Literal(Int.MaxValue)) + 1).as("col")).analyze
+ val optimized2 = Optimize.execute(originalQuery2)
+ val correctAnswer2 = testRelation.select(
+ If($"a" === 1, 1, $"b" + (Literal(Int.MaxValue) + 1)).as("col")).analyze
+ comparePlans(optimized2, correctAnswer2)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]