This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a823f95c5220 [SPARK-52962][SQL] BroadcastExchangeExec should not reset
metrics
a823f95c5220 is described below
commit a823f95c5220bfed2de4c30b630cd04f409edd76
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Jul 26 09:11:38 2025 -0700
[SPARK-52962][SQL] BroadcastExchangeExec should not reset metrics
### What changes were proposed in this pull request?
This patch implements a no-op `resetMetrics` method for
`BroadcastExchangeExec`.
### Why are the changes needed?
This was found during debugging test failures at
https://github.com/apache/spark/pull/51623.
A materialized broadcast exchange (i.e., `BroadcastExchangeExec`) operator
will be trimmed by AQE's empty relation propagation rule to an empty local
relation. That's is because its metrics was reset. `BroadcastExchangeExec`
after materialized won't be materialized again, so its metrics won't be updated
again.
It is weird and inconsistent that a materialized broadcast exchange still
hold broadcast value but its metrics are reset.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #51673 from viirya/broadcast_resetmetrics.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
.../sql/execution/exchange/BroadcastExchangeExec.scala | 6 ++++++
.../spark/sql/execution/BroadcastExchangeSuite.scala | 15 +++++++++++++++
2 files changed, 21 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index c70ee637a248..7f69657988a4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -145,6 +145,12 @@ case class BroadcastExchangeExec(
Statistics(dataSize, Some(rowCount))
}
+ override def resetMetrics(): Unit = {
+ // no-op
+ // BroadcastExchangeExec after materialized won't be materialized again,
so we should not
+ // reset the metrics. Otherwise, we will lose the metrics collected in the
broadcast job.
+ }
+
@transient
private lazy val promise = Promise[broadcast.Broadcast[Any]]()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala
index 60a74a553bc4..8d6ee83f5e6b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala
@@ -98,6 +98,21 @@ class BroadcastExchangeSuite extends SparkPlanTest
assert(joinDF.collect().length == 1)
}
}
+
+ test("SPARK-52962: broadcast exchange should not reset metrics") {
+ val df = spark.range(1).toDF()
+ val joinDF = df.join(broadcast(df), "id")
+ joinDF.collect()
+ val broadcastExchangeExec = collect(
+ joinDF.queryExecution.executedPlan) { case p: BroadcastExchangeExec => p
}
+ assert(broadcastExchangeExec.size == 1, "one and only
BroadcastExchangeExec")
+
+ val broadcastExchangeNode = broadcastExchangeExec.head
+ val metrics = broadcastExchangeNode.metrics
+ assert(metrics("numOutputRows").value == 1)
+ broadcastExchangeNode.resetMetrics()
+ assert(metrics("numOutputRows").value == 1)
+ }
}
// Additional tests run in 'local-cluster' mode.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]