This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ae75efeb407f [SPARK-51505][SQL] Always show empty partition number 
metrics in AQEShuffleReadExec
ae75efeb407f is described below

commit ae75efeb407fd3a5aa924d85c2471dcd32028063
Author: ziqi liu <ziqi....@databricks.com>
AuthorDate: Thu Jul 24 12:27:30 2025 +0800

    [SPARK-51505][SQL] Always show empty partition number metrics in 
AQEShuffleReadExec
    
    ### What changes were proposed in this pull request?
    A followup for https://github.com/apache/spark/pull/50273 Always show empty 
partition number metrics in AQEShuffleReadExec
    
    ### Why are the changes needed?
    Even when there is no coaclescing, we still want to know empty partitions: 
imaging we have shuffle skewness and each non-empty partition is large so no 
coalescing happen, but there are many empty partitions.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Update UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    NO
    
    Closes #51608 from liuzqt/SPARK-51505.
    
    Authored-by: ziqi liu <ziqi....@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/adaptive/AQEShuffleReadExec.scala    | 27 +++++++++++-----------
 .../execution/CoalesceShufflePartitionsSuite.scala | 18 +++++++--------
 .../adaptive/AdaptiveQueryExecSuite.scala          |  2 +-
 3 files changed, 23 insertions(+), 24 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
index e8b70f94a769..2a600b31cc29 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
@@ -178,6 +178,15 @@ case class AQEShuffleReadExec private(
     numPartitionsMetric.set(partitionSpecs.length)
     driverAccumUpdates += (numPartitionsMetric.id -> 
partitionSpecs.length.toLong)
 
+    val numEmptyPartitionsMetric = metrics("numEmptyPartitions")
+    val numEmptyPartitions = child match {
+      case s: ShuffleQueryStageExec =>
+        s.mapStats.map(stats => stats.bytesByPartitionId.count(_ == 
0)).getOrElse(0)
+      case _ => 0
+    }
+    numEmptyPartitionsMetric.set(numEmptyPartitions)
+    driverAccumUpdates += (numEmptyPartitionsMetric.id -> 
numEmptyPartitions.toLong)
+
     if (hasSkewedPartition) {
       val skewedSpecs = partitionSpecs.collect {
         case p: PartialReducerPartitionSpec => p
@@ -200,15 +209,7 @@ case class AQEShuffleReadExec private(
       val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions")
       val x = partitionSpecs.count(isCoalescedSpec)
       numCoalescedPartitionsMetric.set(x)
-      val numEmptyPartitionsMetric = metrics("numEmptyPartitions")
-      val y = child match {
-        case s: ShuffleQueryStageExec =>
-          s.mapStats.map(stats => stats.bytesByPartitionId.count(_ == 
0)).getOrElse(0)
-        case _ => 0
-      }
-      numEmptyPartitionsMetric.set(y)
-      driverAccumUpdates ++= Seq(numCoalescedPartitionsMetric.id -> x,
-        numEmptyPartitionsMetric.id -> y)
+      driverAccumUpdates ++= Seq(numCoalescedPartitionsMetric.id -> x)
     }
 
     partitionDataSizes.foreach { dataSizes =>
@@ -223,7 +224,9 @@ case class AQEShuffleReadExec private(
 
   @transient override lazy val metrics: Map[String, SQLMetric] = {
     if (shuffleStage.isDefined) {
-      Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of 
partitions")) ++ {
+      Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of 
partitions"),
+        "numEmptyPartitions" ->
+          SQLMetrics.createMetric(sparkContext, "number of empty partitions")) 
++ {
         if (isLocalRead) {
           // We split the mapper partition evenly when creating local shuffle 
read, so no
           // data size info is available.
@@ -244,9 +247,7 @@ case class AQEShuffleReadExec private(
       } ++ {
         if (hasCoalescedPartition) {
           Map("numCoalescedPartitions" ->
-            SQLMetrics.createMetric(sparkContext, "number of coalesced 
partitions"),
-            "numEmptyPartitions" ->
-              SQLMetrics.createMetric(sparkContext, "number of empty 
partitions"))
+            SQLMetrics.createMetric(sparkContext, "number of coalesced 
partitions"))
         } else {
           Map.empty
         }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index 4b650957e42c..28762f01d7a2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -503,16 +503,14 @@ class CoalesceShufflePartitionsSuite extends 
SparkFunSuite with SQLConfHelper
 
   test("SPARK-51505: log empty partition number metrics") {
     val test: SparkSession => Unit = { spark: SparkSession =>
-      withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
-        val df = spark.range(0, 1000, 1, 5).withColumn("value", when(col("id") 
< 500, 0)
-            .otherwise(1)).groupBy("value").agg("value" -> "sum")
-        df.collect()
-        val plan = df.queryExecution.executedPlan
-        val coalesce = collectFirst(plan) {
-          case e: AQEShuffleReadExec => e
-        }.get
-        assert(coalesce.metrics("numEmptyPartitions").value == 3)
-      }
+      val df = spark.range(0, 1000, 1, 10).withColumn("value", expr("id % 3"))
+        .groupBy("value").agg("value" -> "sum")
+      df.collect()
+      val plan = df.queryExecution.executedPlan
+      val coalesce = collectFirst(plan) {
+        case e: AQEShuffleReadExec => e
+      }.get
+      assert(coalesce.metrics("numEmptyPartitions").value == 2)
     }
     withSparkSession(test, 100, None)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 5d81239d023e..05c1012200df 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1124,7 +1124,7 @@ class AdaptiveQueryExecSuite
         assert(reads.length == 1)
         val read = reads.head
         assert(read.isLocalRead)
-        assert(read.metrics.keys.toSeq == Seq("numPartitions"))
+        assert(read.metrics.keys.toSeq == Seq("numPartitions", 
"numEmptyPartitions"))
         assert(read.metrics("numPartitions").value == 
read.partitionSpecs.length)
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to