This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8a85450403e4 [SPARK-55461][SQL] Improve AQE Coalesce Grouping warning
messages when numOfPartitions of ShuffleStages in the same coalesce group are
not equal
8a85450403e4 is described below
commit 8a85450403e4c3a4a336bef9dad9946021dd6aba
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Wed Mar 4 21:03:22 2026 +0800
[SPARK-55461][SQL] Improve AQE Coalesce Grouping warning messages when
numOfPartitions of ShuffleStages in the same coalesce group are not equal
### What changes were proposed in this pull request?
Spark Adaptive Query Execution(AQE) framework coalesces the eligible
shuffle partitions (e.g: empty/small-sized shuffle partitions) during the query
execution. This optimization requires to be created coalesce groups for related
Shuffle Stages (e.g: `SortMergeJoin` can have 1 `ShuffleQueryStage` per
join-leg) to guarantee that both SMJ legs having the same num of partitions. To
create coalesce groups for related `ShuffleStages`, Spark Plan Tree needs to be
traversed by finding `Shuffl [...]
**1-** Adding warning log message to
`ShufflePartitionsUtil.coalescePartitionsWithoutSkew()` when numOfPartitions of
ShuffleStages in the same coalesce group are not equal. This is required for
the consistency because `ShufflePartitionsUtil.coalescePartitionsWithSkew()`
logs warning message for the same case,
**2-** Adding problematic shuffleStageIds to warning messages when
numOfPartitions of ShuffleStages in the same coalesce group are not equal. This
info can help for troubleshooting.
**3-** Aligning the warning logs for specially for both
`ShufflePartitionsUtil.coalescePartitionsWithoutSkew()` and
`coalescePartitionsWithSkew()` cases
**4-** 2 new UT cases are being added :
Current UT Cases cover following use cases:
```
skewed SortMergeJoin under Union under BroadcastNestedLoopJoin (BNLJ),
skewed SortMergeJoin under Union under CartesianProduct
```
This PR also adds following new UT cases for following legacy use-cases:
**4.1-** skewed SortMergeJoin under Union under BroadcastHashJoin,
**4.2-** non-skewed SortMergeJoin under Union under BroadcastHashJoin
5- `private def coalescePartitions()` needs to be renamed because Scala
does not allow the existence of default values in multiple overloaded methods.
This causes following Scala compile-time problem:
```
[ERROR] [Error]
/Users/eren.avsarogullari/Development/OSS/ossspark/sql/core/src/main/scala/
org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:26:
in object ShufflePartitionsUtil, multiple overloaded alternatives of method
coalescePartitions define default arguments.
```
### Why are the changes needed?
- Warning message is useful for troubleshooting when numOfPartitions of
`ShuffleStages` in the same coalesce group are not equal,
- Problematic `ShuffleStageIds` can also help for the trouble shooting,
- Additional UT coverage is also useful to verify coalesce grouping logic
to avoid SPARK-46590 kinds of issues.
### Does this PR introduce _any_ user-facing change?
Yes, adding new warning message when `numOfPartitions` of `ShuffleStages`
in the same coalesce group are not equal
### How was this patch tested?
Added 2 new UT cases for existing use-cases to test coalesce grouping logic
such as:
- skewed SortMergeJoin under Union under BroadcastHashJoin,
- non-skewed SortMergeJoin under Union under BroadcastHashJoin
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54242 from erenavsarogullari/SPARK-55461.
Authored-by: Eren Avsarogullari <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../adaptive/CoalesceShufflePartitions.scala | 3 +-
.../execution/adaptive/ShufflePartitionsUtil.scala | 35 +++++++++----
.../execution/CoalesceShufflePartitionsSuite.scala | 57 +++++++++++++++++++++-
3 files changed, 83 insertions(+), 12 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 3fdcb17bdeae..f172942f6ec5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -109,7 +109,8 @@ case class CoalesceShufflePartitions(session: SparkSession)
extends AQEShuffleRe
coalesceGroup.shuffleStages.map(_.partitionSpecs),
advisoryTargetSize = advisoryTargetSize,
minNumPartitions = minNumPartitions,
- minPartitionSize = minPartitionSize)
+ minPartitionSize = minPartitionSize,
+ coalesceGroup.shuffleStages.map(_.shuffleStage.id))
if (newPartitionSpecs.nonEmpty) {
coalesceGroup.shuffleStages.zip(newPartitionSpecs).map { case
(stageInfo, partSpecs) =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index b7cf0ce3150b..fc7d556a6bf7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -46,7 +46,8 @@ object ShufflePartitionsUtil extends Logging {
inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
advisoryTargetSize: Long,
minNumPartitions: Int,
- minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
+ minPartitionSize: Long,
+ shuffleStageIds: Seq[Int] = Seq.empty): Seq[Seq[ShufflePartitionSpec]] =
{
assert(mapOutputStatistics.length == inputPartitionSpecs.length)
if (mapOutputStatistics.isEmpty) {
@@ -69,17 +70,18 @@ object ShufflePartitionsUtil extends Logging {
// If `inputPartitionSpecs` are all empty, it means skew join optimization
is not applied.
if (inputPartitionSpecs.forall(_.isEmpty)) {
coalescePartitionsWithoutSkew(
- mapOutputStatistics, targetSize, minPartitionSize)
+ mapOutputStatistics, targetSize, minPartitionSize, shuffleStageIds)
} else {
coalescePartitionsWithSkew(
- mapOutputStatistics, inputPartitionSpecs, targetSize, minPartitionSize)
+ mapOutputStatistics, inputPartitionSpecs, targetSize,
minPartitionSize, shuffleStageIds)
}
}
private def coalescePartitionsWithoutSkew(
mapOutputStatistics: Seq[Option[MapOutputStatistics]],
targetSize: Long,
- minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
+ minPartitionSize: Long,
+ shuffleStageIds: Seq[Int]): Seq[Seq[ShufflePartitionSpec]] = {
// `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0
partitions,
// we should skip it when calculating the `partitionStartIndices`.
val validMetrics = mapOutputStatistics.flatten
@@ -93,11 +95,15 @@ object ShufflePartitionsUtil extends Logging {
// in that case. For example when we union fully aggregated data (data is
arranged to a single
// partition) and a result of a SortMergeJoin (multiple partitions).
if (validMetrics.map(_.bytesByPartitionId.length).distinct.length > 1) {
+ logWarning(s"Could not apply partition coalescing because
numOfPartitions of " +
+ s"ShuffleStages in the same coalesce group are not equal. " +
+ s"Problematic ShuffleQueryStage(s): ${shuffleStageIds.mkString(", ")}
and " +
+ s"Problematic mapOutputStatistics:
${getMapOutputStatisticsDetails(validMetrics)}")
return Seq.empty
}
val numPartitions = validMetrics.head.bytesByPartitionId.length
- val newPartitionSpecs = coalescePartitions(
+ val newPartitionSpecs = coalescePartitionsAndGetSpecs(
0, numPartitions, validMetrics, targetSize, minPartitionSize)
if (newPartitionSpecs.length < numPartitions) {
attachDataSize(mapOutputStatistics, newPartitionSpecs)
@@ -110,7 +116,8 @@ object ShufflePartitionsUtil extends Logging {
mapOutputStatistics: Seq[Option[MapOutputStatistics]],
inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
targetSize: Long,
- minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
+ minPartitionSize: Long,
+ shuffleStageIds: Seq[Int]): Seq[Seq[ShufflePartitionSpec]] = {
// Do not coalesce if any of the map output stats are missing or if not
all shuffles have
// partition specs, which should not happen in practice.
if (!mapOutputStatistics.forall(_.isDefined) ||
!inputPartitionSpecs.forall(_.isDefined)) {
@@ -131,7 +138,9 @@ object ShufflePartitionsUtil extends Logging {
// There should be no unexpected partition specs and the start indices
should be identical
// across all different shuffles.
if (partitionIndicesSeq.distinct.length > 1 ||
partitionIndicesSeq.head.exists(_ < 0)) {
- logWarning(s"Could not apply partition coalescing because of unexpected
partition indices.")
+ logWarning(s"Could not apply partition coalescing because of unexpected
partition indices. " +
+ s"Problematic ShuffleQueryStage(s): ${shuffleStageIds.mkString(", ")}.
" +
+ s"Invalid shuffle partition specs: $inputPartitionSpecs.")
return Seq.empty
}
@@ -151,7 +160,7 @@ object ShufflePartitionsUtil extends Logging {
val repeatValue = partitionIndices(i)
// coalesce any partitions before partition(i - 1) and after the end
of latest skew section.
if (i - 1 > start) {
- val partitionSpecs = coalescePartitions(
+ val partitionSpecs = coalescePartitionsAndGetSpecs(
partitionIndices(start),
repeatValue,
validMetrics,
@@ -182,7 +191,7 @@ object ShufflePartitionsUtil extends Logging {
}
// coalesce any partitions after the end of last skew section.
if (numPartitions > start) {
- val partitionSpecs = coalescePartitions(
+ val partitionSpecs = coalescePartitionsAndGetSpecs(
partitionIndices(start),
partitionIndices.last + 1,
validMetrics,
@@ -200,6 +209,12 @@ object ShufflePartitionsUtil extends Logging {
}
}
+ private def getMapOutputStatisticsDetails(runtimeStats:
Seq[MapOutputStatistics]): String = {
+ runtimeStats.map { stats =>
+ s"shuffleId: ${stats.shuffleId} -> numOfPartitions:
${stats.bytesByPartitionId.length}"
+ }.mkString(", ")
+ }
+
/**
* Coalesce the partitions of [start, end) from multiple shuffles. This
method assumes that all
* the shuffles have the same number of partitions, and the partitions of
same index will be read
@@ -226,7 +241,7 @@ object ShufflePartitionsUtil extends Logging {
* CoalescedPartitionSpec(0, 2), CoalescedPartitionSpec(2, 3) and
* CoalescedPartitionSpec(3, 5).
*/
- private def coalescePartitions(
+ private def coalescePartitionsAndGetSpecs(
start: Int,
end: Int,
mapOutputStatistics: Seq[MapOutputStatistics],
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index 28762f01d7a2..3ef22ccb77e9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -315,7 +315,8 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite
with SQLConfHelper
}
}
- test("SPARK-46590 adaptive query execution works correctly with broadcast
join and union") {
+ test("SPARK-46590 adaptive query execution works correctly with broadcast
nested loop join " +
+ "and union") {
val test: SparkSession => Unit = { spark: SparkSession =>
import spark.implicits._
spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1KB")
@@ -376,6 +377,60 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite
with SQLConfHelper
withSparkSession(test, 100, None)
}
+ test("SPARK-55461 adaptive query execution works correctly with broadcast
hash join and union") {
+ val test: SparkSession => Unit = { spark: SparkSession =>
+ import spark.implicits._
+ spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "10KB")
+ spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0)
+ val df00 = spark.range(0, 1000, 2)
+ .selectExpr("id as key1", "id as value1")
+ .union(Seq.fill(100000)((600, 600)).toDF("key1", "value1"))
+ val df01 = spark.range(0, 1000, 3)
+ .selectExpr("id as key1", "id as value1")
+ val df10 = spark.range(0, 1000, 5)
+ .selectExpr("id as key2", "id as value2")
+ .union(Seq.fill(500000)((600, 600)).toDF("key2", "value2"))
+ val df11 = spark.range(0, 1000, 7)
+ .selectExpr("id as key2", "id as value2")
+ val df20 = spark.range(0, 10).selectExpr("id as key2", "id as value2")
+
+ val unionDF = df00.join(df01, Array("key1", "value1"), "left_outer")
+ .union(df10.join(df11, Array("key2", "value2"), "left_outer"))
+ // Adding equi-join to trigger BHJ
+ df20.hint("broadcast").join(unionDF, $"key1" === $"key2" && $"value1"
=== $"value2")
+ .write
+ .format("noop")
+ .mode("overwrite")
+ .save()
+ }
+ withSparkSession(test, 12000, None)
+ }
+
+ test("SPARK-55461 adaptive query execution works correctly with broadcast
hash join and " +
+ "nested union and non-skewed smj") {
+ val test: SparkSession => Unit = { spark: SparkSession =>
+ import spark.implicits._
+ spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "10KB")
+ spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0)
+ val df00 = spark.range(0, 1000, 2)
+ .selectExpr("id as key1", "id as value1")
+ val df01 = spark.range(0, 1000, 3)
+ .selectExpr("id as key1", "id as value1")
+ val df10 = spark.range(0, 1000, 5)
+ .selectExpr("id as key2", "id as value2")
+ .union(Seq.fill(500000)((600, 600)).toDF("key2", "value2"))
+ val df20 = spark.range(0, 10).selectExpr("id as key2", "id as value2")
+
+ val unionDF = df00.join(df01, Array("key1", "value1"), "left_outer")
+ .union(df10.groupBy("key2").count())
+ val result = df20.hint("broadcast")
+ .join(unionDF, $"key1" === $"key2" && $"value1" === $"value2")
+ .count()
+ assert(result == 5, s"Query result should be 5 (expected) but $result
(actual)")
+ }
+ withSparkSession(test, 12000, None)
+ }
+
test("SPARK-24705 adaptive query execution works correctly when exchange
reuse enabled") {
val test: SparkSession => Unit = { spark: SparkSession =>
withSQLConf("spark.sql.exchange.reuse" -> "true") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]