This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a85450403e4 [SPARK-55461][SQL] Improve AQE Coalesce Grouping warning 
messages when numOfPartitions of ShuffleStages in the same coalesce group are 
not equal
8a85450403e4 is described below

commit 8a85450403e4c3a4a336bef9dad9946021dd6aba
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Wed Mar 4 21:03:22 2026 +0800

    [SPARK-55461][SQL] Improve AQE Coalesce Grouping warning messages when 
numOfPartitions of ShuffleStages in the same coalesce group are not equal
    
    ### What changes were proposed in this pull request?
    Spark Adaptive Query Execution(AQE) framework coalesces the eligible 
shuffle partitions (e.g: empty/small-sized shuffle partitions) during the query 
execution. This optimization requires to be created coalesce groups for related 
Shuffle Stages (e.g: `SortMergeJoin` can have 1 `ShuffleQueryStage` per 
join-leg) to guarantee that both SMJ legs having the same num of partitions. To 
create coalesce groups for related `ShuffleStages`, Spark Plan Tree needs to be 
traversed by finding `Shuffl [...]
    **1-** Adding warning log message to 
`ShufflePartitionsUtil.coalescePartitionsWithoutSkew()` when numOfPartitions of 
ShuffleStages in the same coalesce group are not equal. This is required for 
the consistency because `ShufflePartitionsUtil.coalescePartitionsWithSkew()` 
logs warning message for the same case,
    
    **2-** Adding problematic shuffleStageIds to warning messages when 
numOfPartitions of ShuffleStages in the same coalesce group are not equal. This 
info can help for troubleshooting.
    
    **3-** Aligning the warning logs for specially for both 
`ShufflePartitionsUtil.coalescePartitionsWithoutSkew()` and 
`coalescePartitionsWithSkew()` cases
    
    **4-** 2 new UT cases are being added :
    Current UT Cases cover following use cases:
    ```
    skewed SortMergeJoin under Union under BroadcastNestedLoopJoin (BNLJ),
    skewed SortMergeJoin under Union under CartesianProduct
    ```
    This PR also adds following new UT cases for following legacy use-cases:
    **4.1-** skewed SortMergeJoin under Union under BroadcastHashJoin,
    **4.2-** non-skewed SortMergeJoin under Union under BroadcastHashJoin
    
    5- `private def coalescePartitions()` needs to be renamed because Scala 
does not allow the existence of default values in multiple overloaded methods. 
This causes following Scala compile-time problem:
    ```
    [ERROR] [Error] 
/Users/eren.avsarogullari/Development/OSS/ossspark/sql/core/src/main/scala/
    org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:26:
    in object ShufflePartitionsUtil, multiple overloaded alternatives of method 
coalescePartitions define default arguments.
    ```
    ### Why are the changes needed?
    - Warning message is useful for troubleshooting when numOfPartitions of 
`ShuffleStages` in the same coalesce group are not equal,
    - Problematic `ShuffleStageIds` can also help for the trouble shooting,
    - Additional UT coverage is also useful to verify coalesce grouping logic 
to avoid SPARK-46590 kinds of issues.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, adding new warning message when `numOfPartitions` of `ShuffleStages` 
in the same coalesce group are not equal
    
    ### How was this patch tested?
    Added 2 new UT cases for existing use-cases to test coalesce grouping logic 
such as:
    - skewed SortMergeJoin under Union under BroadcastHashJoin,
    - non-skewed SortMergeJoin under Union under BroadcastHashJoin
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #54242 from erenavsarogullari/SPARK-55461.
    
    Authored-by: Eren Avsarogullari <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../adaptive/CoalesceShufflePartitions.scala       |  3 +-
 .../execution/adaptive/ShufflePartitionsUtil.scala | 35 +++++++++----
 .../execution/CoalesceShufflePartitionsSuite.scala | 57 +++++++++++++++++++++-
 3 files changed, 83 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 3fdcb17bdeae..f172942f6ec5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -109,7 +109,8 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends AQEShuffleRe
         coalesceGroup.shuffleStages.map(_.partitionSpecs),
         advisoryTargetSize = advisoryTargetSize,
         minNumPartitions = minNumPartitions,
-        minPartitionSize = minPartitionSize)
+        minPartitionSize = minPartitionSize,
+        coalesceGroup.shuffleStages.map(_.shuffleStage.id))
 
       if (newPartitionSpecs.nonEmpty) {
         coalesceGroup.shuffleStages.zip(newPartitionSpecs).map { case 
(stageInfo, partSpecs) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index b7cf0ce3150b..fc7d556a6bf7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -46,7 +46,8 @@ object ShufflePartitionsUtil extends Logging {
       inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
       advisoryTargetSize: Long,
       minNumPartitions: Int,
-      minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
+      minPartitionSize: Long,
+      shuffleStageIds: Seq[Int] = Seq.empty): Seq[Seq[ShufflePartitionSpec]] = 
{
     assert(mapOutputStatistics.length == inputPartitionSpecs.length)
 
     if (mapOutputStatistics.isEmpty) {
@@ -69,17 +70,18 @@ object ShufflePartitionsUtil extends Logging {
     // If `inputPartitionSpecs` are all empty, it means skew join optimization 
is not applied.
     if (inputPartitionSpecs.forall(_.isEmpty)) {
       coalescePartitionsWithoutSkew(
-        mapOutputStatistics, targetSize, minPartitionSize)
+        mapOutputStatistics, targetSize, minPartitionSize, shuffleStageIds)
     } else {
       coalescePartitionsWithSkew(
-        mapOutputStatistics, inputPartitionSpecs, targetSize, minPartitionSize)
+        mapOutputStatistics, inputPartitionSpecs, targetSize, 
minPartitionSize, shuffleStageIds)
     }
   }
 
   private def coalescePartitionsWithoutSkew(
       mapOutputStatistics: Seq[Option[MapOutputStatistics]],
       targetSize: Long,
-      minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
+      minPartitionSize: Long,
+      shuffleStageIds: Seq[Int]): Seq[Seq[ShufflePartitionSpec]] = {
     // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 
partitions,
     // we should skip it when calculating the `partitionStartIndices`.
     val validMetrics = mapOutputStatistics.flatten
@@ -93,11 +95,15 @@ object ShufflePartitionsUtil extends Logging {
     // in that case. For example when we union fully aggregated data (data is 
arranged to a single
     // partition) and a result of a SortMergeJoin (multiple partitions).
     if (validMetrics.map(_.bytesByPartitionId.length).distinct.length > 1) {
+      logWarning(s"Could not apply partition coalescing because 
numOfPartitions of " +
+        s"ShuffleStages in the same coalesce group are not equal. " +
+        s"Problematic ShuffleQueryStage(s): ${shuffleStageIds.mkString(", ")} 
and " +
+        s"Problematic mapOutputStatistics: 
${getMapOutputStatisticsDetails(validMetrics)}")
       return Seq.empty
     }
 
     val numPartitions = validMetrics.head.bytesByPartitionId.length
-    val newPartitionSpecs = coalescePartitions(
+    val newPartitionSpecs = coalescePartitionsAndGetSpecs(
       0, numPartitions, validMetrics, targetSize, minPartitionSize)
     if (newPartitionSpecs.length < numPartitions) {
       attachDataSize(mapOutputStatistics, newPartitionSpecs)
@@ -110,7 +116,8 @@ object ShufflePartitionsUtil extends Logging {
       mapOutputStatistics: Seq[Option[MapOutputStatistics]],
       inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
       targetSize: Long,
-      minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
+      minPartitionSize: Long,
+      shuffleStageIds: Seq[Int]): Seq[Seq[ShufflePartitionSpec]] = {
     // Do not coalesce if any of the map output stats are missing or if not 
all shuffles have
     // partition specs, which should not happen in practice.
     if (!mapOutputStatistics.forall(_.isDefined) || 
!inputPartitionSpecs.forall(_.isDefined)) {
@@ -131,7 +138,9 @@ object ShufflePartitionsUtil extends Logging {
     // There should be no unexpected partition specs and the start indices 
should be identical
     // across all different shuffles.
     if (partitionIndicesSeq.distinct.length > 1 || 
partitionIndicesSeq.head.exists(_ < 0)) {
-      logWarning(s"Could not apply partition coalescing because of unexpected 
partition indices.")
+      logWarning(s"Could not apply partition coalescing because of unexpected 
partition indices. " +
+        s"Problematic ShuffleQueryStage(s): ${shuffleStageIds.mkString(", ")}. 
" +
+        s"Invalid shuffle partition specs: $inputPartitionSpecs.")
       return Seq.empty
     }
 
@@ -151,7 +160,7 @@ object ShufflePartitionsUtil extends Logging {
         val repeatValue = partitionIndices(i)
         // coalesce any partitions before partition(i - 1) and after the end 
of latest skew section.
         if (i - 1 > start) {
-          val partitionSpecs = coalescePartitions(
+          val partitionSpecs = coalescePartitionsAndGetSpecs(
             partitionIndices(start),
             repeatValue,
             validMetrics,
@@ -182,7 +191,7 @@ object ShufflePartitionsUtil extends Logging {
     }
     // coalesce any partitions after the end of last skew section.
     if (numPartitions > start) {
-      val partitionSpecs = coalescePartitions(
+      val partitionSpecs = coalescePartitionsAndGetSpecs(
         partitionIndices(start),
         partitionIndices.last + 1,
         validMetrics,
@@ -200,6 +209,12 @@ object ShufflePartitionsUtil extends Logging {
     }
   }
 
+  private def getMapOutputStatisticsDetails(runtimeStats: 
Seq[MapOutputStatistics]): String = {
+    runtimeStats.map { stats =>
+      s"shuffleId: ${stats.shuffleId} -> numOfPartitions: 
${stats.bytesByPartitionId.length}"
+    }.mkString(", ")
+  }
+
   /**
    * Coalesce the partitions of [start, end) from multiple shuffles. This 
method assumes that all
    * the shuffles have the same number of partitions, and the partitions of 
same index will be read
@@ -226,7 +241,7 @@ object ShufflePartitionsUtil extends Logging {
    *          CoalescedPartitionSpec(0, 2), CoalescedPartitionSpec(2, 3) and
    *          CoalescedPartitionSpec(3, 5).
    */
-  private def coalescePartitions(
+  private def coalescePartitionsAndGetSpecs(
       start: Int,
       end: Int,
       mapOutputStatistics: Seq[MapOutputStatistics],
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index 28762f01d7a2..3ef22ccb77e9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -315,7 +315,8 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with SQLConfHelper
     }
   }
 
-  test("SPARK-46590 adaptive query execution works correctly with broadcast 
join and union") {
+  test("SPARK-46590 adaptive query execution works correctly with broadcast 
nested loop join " +
+    "and union") {
     val test: SparkSession => Unit = { spark: SparkSession =>
       import spark.implicits._
       spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1KB")
@@ -376,6 +377,60 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with SQLConfHelper
     withSparkSession(test, 100, None)
   }
 
+  test("SPARK-55461 adaptive query execution works correctly with broadcast 
hash join and union") {
+    val test: SparkSession => Unit = { spark: SparkSession =>
+      import spark.implicits._
+      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "10KB")
+      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0)
+      val df00 = spark.range(0, 1000, 2)
+        .selectExpr("id as key1", "id as value1")
+        .union(Seq.fill(100000)((600, 600)).toDF("key1", "value1"))
+      val df01 = spark.range(0, 1000, 3)
+        .selectExpr("id as key1", "id as value1")
+      val df10 = spark.range(0, 1000, 5)
+        .selectExpr("id as key2", "id as value2")
+        .union(Seq.fill(500000)((600, 600)).toDF("key2", "value2"))
+      val df11 = spark.range(0, 1000, 7)
+        .selectExpr("id as key2", "id as value2")
+      val df20 = spark.range(0, 10).selectExpr("id as key2", "id as value2")
+
+      val unionDF = df00.join(df01, Array("key1", "value1"), "left_outer")
+        .union(df10.join(df11, Array("key2", "value2"), "left_outer"))
+      // Adding equi-join to trigger BHJ
+      df20.hint("broadcast").join(unionDF, $"key1" === $"key2" && $"value1" 
=== $"value2")
+        .write
+        .format("noop")
+        .mode("overwrite")
+        .save()
+    }
+    withSparkSession(test, 12000, None)
+  }
+
+  test("SPARK-55461 adaptive query execution works correctly with broadcast 
hash join and " +
+    "nested union and non-skewed smj") {
+    val test: SparkSession => Unit = { spark: SparkSession =>
+      import spark.implicits._
+      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "10KB")
+      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0)
+      val df00 = spark.range(0, 1000, 2)
+        .selectExpr("id as key1", "id as value1")
+      val df01 = spark.range(0, 1000, 3)
+        .selectExpr("id as key1", "id as value1")
+      val df10 = spark.range(0, 1000, 5)
+        .selectExpr("id as key2", "id as value2")
+        .union(Seq.fill(500000)((600, 600)).toDF("key2", "value2"))
+      val df20 = spark.range(0, 10).selectExpr("id as key2", "id as value2")
+
+      val unionDF = df00.join(df01, Array("key1", "value1"), "left_outer")
+        .union(df10.groupBy("key2").count())
+      val result = df20.hint("broadcast")
+        .join(unionDF, $"key1" === $"key2" && $"value1" === $"value2")
+        .count()
+      assert(result == 5, s"Query result should be 5 (expected) but $result 
(actual)")
+    }
+    withSparkSession(test, 12000, None)
+  }
+
   test("SPARK-24705 adaptive query execution works correctly when exchange 
reuse enabled") {
     val test: SparkSession => Unit = { spark: SparkSession =>
       withSQLConf("spark.sql.exchange.reuse" -> "true") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to