This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a823f95c5220 [SPARK-52962][SQL] BroadcastExchangeExec should not reset 
metrics
a823f95c5220 is described below

commit a823f95c5220bfed2de4c30b630cd04f409edd76
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Jul 26 09:11:38 2025 -0700

    [SPARK-52962][SQL] BroadcastExchangeExec should not reset metrics
    
    ### What changes were proposed in this pull request?
    
    This patch implements a no-op `resetMetrics` method for 
`BroadcastExchangeExec`.
    
    ### Why are the changes needed?
    
    This was found during debugging test failures at 
https://github.com/apache/spark/pull/51623.
    
    A materialized broadcast exchange (i.e., `BroadcastExchangeExec`) operator 
will be trimmed by AQE's empty relation propagation rule to an empty local 
relation. That's is because its metrics was reset. `BroadcastExchangeExec` 
after materialized won't be materialized again, so its metrics won't be updated 
again.
    
    It is weird and inconsistent that a materialized broadcast exchange still 
hold broadcast value but its metrics are reset.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51673 from viirya/broadcast_resetmetrics.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
 .../sql/execution/exchange/BroadcastExchangeExec.scala    |  6 ++++++
 .../spark/sql/execution/BroadcastExchangeSuite.scala      | 15 +++++++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index c70ee637a248..7f69657988a4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -145,6 +145,12 @@ case class BroadcastExchangeExec(
     Statistics(dataSize, Some(rowCount))
   }
 
+  override def resetMetrics(): Unit = {
+    // no-op
+    // BroadcastExchangeExec after materialized won't be materialized again, 
so we should not
+    // reset the metrics. Otherwise, we will lose the metrics collected in the 
broadcast job.
+  }
+
   @transient
   private lazy val promise = Promise[broadcast.Broadcast[Any]]()
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala
index 60a74a553bc4..8d6ee83f5e6b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala
@@ -98,6 +98,21 @@ class BroadcastExchangeSuite extends SparkPlanTest
       assert(joinDF.collect().length == 1)
     }
   }
+
+  test("SPARK-52962: broadcast exchange should not reset metrics") {
+    val df = spark.range(1).toDF()
+    val joinDF = df.join(broadcast(df), "id")
+    joinDF.collect()
+    val broadcastExchangeExec = collect(
+      joinDF.queryExecution.executedPlan) { case p: BroadcastExchangeExec => p 
}
+    assert(broadcastExchangeExec.size == 1, "one and only 
BroadcastExchangeExec")
+
+    val broadcastExchangeNode = broadcastExchangeExec.head
+    val metrics = broadcastExchangeNode.metrics
+    assert(metrics("numOutputRows").value == 1)
+    broadcastExchangeNode.resetMetrics()
+    assert(metrics("numOutputRows").value == 1)
+  }
 }
 
 // Additional tests run in 'local-cluster' mode.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to