This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ae75efeb407f [SPARK-51505][SQL] Always show empty partition number metrics in AQEShuffleReadExec ae75efeb407f is described below commit ae75efeb407fd3a5aa924d85c2471dcd32028063 Author: ziqi liu <ziqi....@databricks.com> AuthorDate: Thu Jul 24 12:27:30 2025 +0800 [SPARK-51505][SQL] Always show empty partition number metrics in AQEShuffleReadExec ### What changes were proposed in this pull request? A followup for https://github.com/apache/spark/pull/50273 Always show empty partition number metrics in AQEShuffleReadExec ### Why are the changes needed? Even when there is no coaclescing, we still want to know empty partitions: imaging we have shuffle skewness and each non-empty partition is large so no coalescing happen, but there are many empty partitions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Update UT ### Was this patch authored or co-authored using generative AI tooling? NO Closes #51608 from liuzqt/SPARK-51505. Authored-by: ziqi liu <ziqi....@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/adaptive/AQEShuffleReadExec.scala | 27 +++++++++++----------- .../execution/CoalesceShufflePartitionsSuite.scala | 18 +++++++-------- .../adaptive/AdaptiveQueryExecSuite.scala | 2 +- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala index e8b70f94a769..2a600b31cc29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala @@ -178,6 +178,15 @@ case class AQEShuffleReadExec private( numPartitionsMetric.set(partitionSpecs.length) driverAccumUpdates += (numPartitionsMetric.id -> partitionSpecs.length.toLong) + val numEmptyPartitionsMetric = metrics("numEmptyPartitions") + val numEmptyPartitions = child match { + case s: ShuffleQueryStageExec => + s.mapStats.map(stats => stats.bytesByPartitionId.count(_ == 0)).getOrElse(0) + case _ => 0 + } + numEmptyPartitionsMetric.set(numEmptyPartitions) + driverAccumUpdates += (numEmptyPartitionsMetric.id -> numEmptyPartitions.toLong) + if (hasSkewedPartition) { val skewedSpecs = partitionSpecs.collect { case p: PartialReducerPartitionSpec => p @@ -200,15 +209,7 @@ case class AQEShuffleReadExec private( val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions") val x = partitionSpecs.count(isCoalescedSpec) numCoalescedPartitionsMetric.set(x) - val numEmptyPartitionsMetric = metrics("numEmptyPartitions") - val y = child match { - case s: ShuffleQueryStageExec => - s.mapStats.map(stats => stats.bytesByPartitionId.count(_ == 0)).getOrElse(0) - case _ => 0 - } - numEmptyPartitionsMetric.set(y) - driverAccumUpdates ++= Seq(numCoalescedPartitionsMetric.id -> x, - numEmptyPartitionsMetric.id -> y) + driverAccumUpdates ++= Seq(numCoalescedPartitionsMetric.id -> x) } partitionDataSizes.foreach { dataSizes => @@ -223,7 +224,9 @@ case class AQEShuffleReadExec private( @transient override lazy val metrics: Map[String, SQLMetric] = { if (shuffleStage.isDefined) { - Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions")) ++ { + Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions"), + "numEmptyPartitions" -> + SQLMetrics.createMetric(sparkContext, "number of empty partitions")) ++ { if (isLocalRead) { // We split the mapper partition evenly when creating local shuffle read, so no // data size info is available. @@ -244,9 +247,7 @@ case class AQEShuffleReadExec private( } ++ { if (hasCoalescedPartition) { Map("numCoalescedPartitions" -> - SQLMetrics.createMetric(sparkContext, "number of coalesced partitions"), - "numEmptyPartitions" -> - SQLMetrics.createMetric(sparkContext, "number of empty partitions")) + SQLMetrics.createMetric(sparkContext, "number of coalesced partitions")) } else { Map.empty } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index 4b650957e42c..28762f01d7a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -503,16 +503,14 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with SQLConfHelper test("SPARK-51505: log empty partition number metrics") { val test: SparkSession => Unit = { spark: SparkSession => - withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") { - val df = spark.range(0, 1000, 1, 5).withColumn("value", when(col("id") < 500, 0) - .otherwise(1)).groupBy("value").agg("value" -> "sum") - df.collect() - val plan = df.queryExecution.executedPlan - val coalesce = collectFirst(plan) { - case e: AQEShuffleReadExec => e - }.get - assert(coalesce.metrics("numEmptyPartitions").value == 3) - } + val df = spark.range(0, 1000, 1, 10).withColumn("value", expr("id % 3")) + .groupBy("value").agg("value" -> "sum") + df.collect() + val plan = df.queryExecution.executedPlan + val coalesce = collectFirst(plan) { + case e: AQEShuffleReadExec => e + }.get + assert(coalesce.metrics("numEmptyPartitions").value == 2) } withSparkSession(test, 100, None) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 5d81239d023e..05c1012200df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1124,7 +1124,7 @@ class AdaptiveQueryExecSuite assert(reads.length == 1) val read = reads.head assert(read.isLocalRead) - assert(read.metrics.keys.toSeq == Seq("numPartitions")) + assert(read.metrics.keys.toSeq == Seq("numPartitions", "numEmptyPartitions")) assert(read.metrics("numPartitions").value == read.partitionSpecs.length) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org