This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b9ee41f [SPARK-34922][SQL][3.0] Use a relative cost comparison
function in the CBO
b9ee41f is described below
commit b9ee41fa9957631ca0f859ee928358c108fbd9a9
Author: Tanel Kiis <[email protected]>
AuthorDate: Thu Apr 8 11:03:59 2021 +0900
[SPARK-34922][SQL][3.0] Use a relative cost comparison function in the CBO
### What changes were proposed in this pull request?
Changed the cost comparison function of the CBO to use the ratios of row
counts and sizes in bytes.
### Why are the changes needed?
In #30965 we changed to CBO cost comparison function so it would be
"symetric": `A.betterThan(B)` now implies, that `!B.betterThan(A)`.
With that we caused a performance regressions in some queries - TPCDS q19
for example.
The original cost comparison function used the ratios `relativeRows =
A.rowCount / B.rowCount` and `relativeSize = A.size / B.size`. The changed
function compared "absolute" cost values `costA = w*A.rowCount + (1-w)*A.size`
and `costB = w*B.rowCount + (1-w)*B.size`.
Given the input from wzhfy we decided to go back to the relative values,
because otherwise one (size) may overwhelm the other (rowCount). But this time
we avoid adding up the ratios.
Originally `A.betterThan(B) => w*relativeRows + (1-w)*relativeSize < 1` was
used. Besides being "non-symteric", this also can exhibit one overwhelming
other.
For `w=0.5` If `A` size (bytes) is at least 2x larger than `B`, then no
matter how many times more rows does the `B` plan have, `B` will allways be
considered to be better - `0.5*2 + 0.5*0.00000000000001 > 1`.
When working with ratios, then it would be better to multiply them.
The proposed cost comparison function is: `A.betterThan(B) =>
relativeRows^w * relativeSize^(1-w) < 1`.
### Does this PR introduce _any_ user-facing change?
Comparison of the changed TPCDS v1.4 query execution times at sf=10:
| absolute | multiplicative | | additive |
-- | -- | -- | -- | -- | --
q12 | 145 | 137 | -5.52% | 141 | -2.76%
q13 | 264 | 271 | 2.65% | 271 | 2.65%
q17 | 4521 | 4243 | -6.15% | 4348 | -3.83%
q18 | 758 | 466 | -38.52% | 480 | -36.68%
q19 | 38503 | 2167 | -94.37% | 2176 | -94.35%
q20 | 119 | 120 | 0.84% | 126 | 5.88%
q24a | 16429 | 16838 | 2.49% | 17103 | 4.10%
q24b | 16592 | 16999 | 2.45% | 17268 | 4.07%
q25 | 3558 | 3556 | -0.06% | 3675 | 3.29%
q33 | 362 | 361 | -0.28% | 380 | 4.97%
q52 | 1020 | 1032 | 1.18% | 1052 | 3.14%
q55 | 927 | 938 | 1.19% | 961 | 3.67%
q72 | 24169 | 13377 | -44.65% | 24306 | 0.57%
q81 | 1285 | 1185 | -7.78% | 1168 | -9.11%
q91 | 324 | 336 | 3.70% | 337 | 4.01%
q98 | 126 | 129 | 2.38% | 131 | 3.97%
All times are in ms, the change is compared to the situation in the master
branch (absolute).
The proposed cost function (multiplicative) significantlly improves the
performance on q18, q19 and q72. The original cost function (additive) has
similar improvements at q18 and q19. All other chagnes are within the error
bars and I would ignore them - perhaps q81 has also improved.
### How was this patch tested?
PlanStabilitySuite
Closes #32076 from tanelk/SPARK-34922_cbo_better_cost_function_3.0.
Lead-authored-by: Tanel Kiis <[email protected]>
Co-authored-by: [email protected] <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
---
.../catalyst/optimizer/CostBasedJoinReorder.scala | 28 ++++++++++++++++++----
.../org/apache/spark/sql/internal/SQLConf.scala | 6 +++--
.../sql/catalyst/optimizer/JoinReorderSuite.scala | 3 ---
.../optimizer/StarJoinCostBasedReorderSuite.scala | 9 +++----
4 files changed, 32 insertions(+), 14 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
index 93c608dc..ed7d92e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -343,12 +343,30 @@ object JoinReorderDP extends PredicateHelper with Logging
{
}
}
+ /**
+ * To identify the plan with smaller computational cost,
+ * we use the weighted geometric mean of ratio of rows and the ratio of
sizes in bytes.
+ *
+ * There are other ways to combine these values as a cost comparison
function.
+ * Some of these, that we have experimented with, but have gotten worse
result,
+ * than with the current one:
+ * 1) Weighted arithmetic mean of these two ratios - adding up fractions
puts
+ * less emphasis on ratios between 0 and 1. Ratios 10 and 0.1 should be
considered
+ * to be just as strong evidences in opposite directions. The arithmetic
mean of these
+ * would be heavily biased towards the 10.
+ * 2) Absolute cost (cost = weight * rowCount + (1 - weight) * size) -
when adding up
+ * two numeric measurements that have different units we can easily end up
with one
+ * overwhelming the other.
+ */
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
- val thisCost = BigDecimal(this.planCost.card) *
conf.joinReorderCardWeight +
- BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight)
- val otherCost = BigDecimal(other.planCost.card) *
conf.joinReorderCardWeight +
- BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight)
- thisCost < otherCost
+ if (other.planCost.card == 0 || other.planCost.size == 0) {
+ false
+ } else {
+ val relativeRows = BigDecimal(this.planCost.card) /
BigDecimal(other.planCost.card)
+ val relativeSize = BigDecimal(this.planCost.size) /
BigDecimal(other.planCost.size)
+ Math.pow(relativeRows.doubleValue, conf.joinReorderCardWeight) *
+ Math.pow(relativeSize.doubleValue, 1 - conf.joinReorderCardWeight) <
1
+ }
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 85ac7b8..ce0459e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1664,8 +1664,10 @@ object SQLConf {
val JOIN_REORDER_CARD_WEIGHT =
buildConf("spark.sql.cbo.joinReorder.card.weight")
.internal()
- .doc("The weight of cardinality (number of rows) for plan cost
comparison in join reorder: " +
- "rows * weight + size * (1 - weight).")
+ .doc("The weight of the ratio of cardinalities (number of rows) " +
+ "in the cost comparison function. The ratio of sizes in bytes has
weight " +
+ "1 - this value. The weighted geometric mean of these ratios is used
to decide " +
+ "which of the candidate plans will be chosen by the CBO.")
.version("2.2.0")
.doubleConf
.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must
be in [0, 1].")
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index 75fe3dd..5dca7de 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -391,9 +391,6 @@ class JoinReorderSuite extends PlanTest with
StatsEstimationTestBase {
val plan1 = JoinPlan(null, null, null, Cost(300, 80))
val plan2 = JoinPlan(null, null, null, Cost(500, 30))
- // cost1 = 300*0.7 + 80*0.3 = 234
- // cost2 = 500*0.7 + 30*0.3 = 359
-
assert(!plan1.betterThan(plan1, conf))
assert(!plan2.betterThan(plan2, conf))
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index b7cf383..391f67e 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -293,12 +293,13 @@ class StarJoinCostBasedReorderSuite extends PlanTest with
StatsEstimationTestBas
(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
val expected =
- t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
+ f1
+ .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+ .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+ .join(t3.join(t4, Inner, Some(nameToAttr("t3_c1") ===
nameToAttr("t4_c1"))), Inner,
+ Some(nameToAttr("d1_c2") === nameToAttr("t4_c1")))
.join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") ===
nameToAttr("t2_c1"))), Inner,
Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
- .join(f1
- .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
- .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))))
.select(outputsOf(d1, t1, t2, t3, t4, f1, d2): _*)
assertEqualPlans(query, expected)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]