This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8d0800a [SPARK-31201][SQL] Add an individual config for skewed
partition threshold
8d0800a is described below
commit 8d0800a0803d3c47938bddefa15328d654739bc5
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Mar 26 22:57:01 2020 +0900
[SPARK-31201][SQL] Add an individual config for skewed partition threshold
Skew join handling comes with an overhead: we need to read some data
repeatedly. We should treat a partition as skewed if it's large enough so that
it's beneficial to do so.
Currently the size threshold is the advisory partition size, which is 64 MB
by default. This is not large enough for the skewed partition size threshold.
This PR adds a new config for the threshold and set default value as 256 MB.
Avoid skew join handling that may introduce a perf regression.
no
existing tests
Closes #27967 from cloud-fan/aqe.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 05498af72e19b058b210815e1053f3fa9b0157d9)
Signed-off-by: HyukjinKwon <[email protected]>
---
docs/sql-performance-tuning.md | 9 ++++++++-
.../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 11 ++++++++++-
.../spark/sql/execution/adaptive/OptimizeSkewedJoin.scala | 2 +-
.../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 2 +-
4 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/docs/sql-performance-tuning.md b/docs/sql-performance-tuning.md
index 489575d..9a1cc89 100644
--- a/docs/sql-performance-tuning.md
+++ b/docs/sql-performance-tuning.md
@@ -242,7 +242,14 @@ Data skew can severely downgrade the performance of join
queries. This feature d
<td><code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code></td>
<td>10</td>
<td>
- A partition is considered as skewed if its size is larger than this
factor multiplying the median partition size and also larger than
<code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
+ A partition is considered as skewed if its size is larger than this
factor multiplying the median partition size and also larger than
<code>spark.sql.adaptive.skewedPartitionThresholdInBytes</code>.
+ </td>
+ </tr>
+ <tr>
+
<td><code>spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes</code></td>
+ <td>256MB</td>
+ <td>
+ A partition is considered as skewed if its size in bytes is larger
than this threshold and also larger than
<code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code> multiplying the
median partition size. Ideally this config should be set larger than
<code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
</td>
</tr>
</table>
\ No newline at end of file
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1b00bed..c61a57e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -455,11 +455,20 @@ object SQLConf {
buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
.doc("A partition is considered as skewed if its size is larger than
this factor " +
"multiplying the median partition size and also larger than " +
- s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'")
+ "'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes'")
.intConf
.checkValue(_ > 0, "The skew factor must be positive.")
.createWithDefault(10)
+ val SKEW_JOIN_SKEWED_PARTITION_THRESHOLD =
+ buildConf("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes")
+ .doc("A partition is considered as skewed if its size in bytes is larger
than this " +
+ s"threshold and also larger than
'${SKEW_JOIN_SKEWED_PARTITION_FACTOR.key}' " +
+ "multiplying the median partition size. Ideally this config should be
set larger " +
+ s"than '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("256MB")
+
val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.internal()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index e02b9af..b09e563 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -67,7 +67,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends
Rule[SparkPlan] {
*/
private def isSkewed(size: Long, medianSize: Long): Boolean = {
size > medianSize *
conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
- size > conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+ size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
}
private def medianSize(stats: MapOutputStatistics): Long = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index fcca23d..d4c5b0d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -615,6 +615,7 @@ class AdaptiveQueryExecSuite
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "2000",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000") {
withTempView("skewData1", "skewData2") {
spark
@@ -781,4 +782,3 @@ class AdaptiveQueryExecSuite
}
}
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]