This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 1d6665ae8e7d [SPARK-54749][SQL] Fix incorrect numOutputRows metric in
OneRowRelationExec
1d6665ae8e7d is described below
commit 1d6665ae8e7d6caac75e2b3ebb6588b4788f2727
Author: Fu Chen <[email protected]>
AuthorDate: Thu Dec 18 21:07:33 2025 +0800
[SPARK-54749][SQL] Fix incorrect numOutputRows metric in OneRowRelationExec
### What changes were proposed in this pull request?
This PR moves the `numOutputRows` metric update logic from the val rdd
initialization block to the `doExecute()` method in `OneRowRelationExec`.
### Why are the changes needed?
Currently, the `numOutputRows` metric in `OneRowRelationExec` is
incorrectly incremented twice in the codegen codebase (displaying 2 instead of
1 for a single row).
before this PR:
<img width="251" height="318"
alt="企业微信截图_70a269d7-7e34-40a4-a0f4-abeae7283a79"
src="https://github.com/user-attachments/assets/4a376f45-e89d-4fa6-8ba9-a564663467cf"
/>
after this PR:
<img width="242" height="314"
alt="企业微信截图_d7fab096-024e-440b-94e1-707296258c55"
src="https://github.com/user-attachments/assets/036de957-9ba9-4b7d-a89e-ed737d23eae2"
/>
### Does this PR introduce _any_ user-facing change?
No, only bug fix.
### How was this patch tested?
Added UT.
### Was this patch authored or co-authored using generative AI tooling?
Closes #53520 from cfmcgrady/fix-onerowrelation-metrics.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit a26c20953bb3262ce4da1f609457d7525e37af1a)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/execution/ExistingRDD.scala | 14 ++++++++------
.../spark/sql/execution/metric/SQLMetricsSuite.scala | 19 +++++++++++++++++++
2 files changed, 27 insertions(+), 6 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 6148fb30783e..06085497de19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -360,23 +360,25 @@ case class OneRowRelationExec() extends LeafExecNode
override val output: Seq[Attribute] = Nil
private val rdd: RDD[InternalRow] = {
- val numOutputRows = longMetric("numOutputRows")
session
.sparkContext
.parallelize(Seq(""), 1)
.mapPartitionsInternal { _ =>
val proj = UnsafeProjection.create(Seq.empty[Expression])
- Iterator(proj.apply(InternalRow.empty)).map { r =>
- numOutputRows += 1
- r
- }
+ Iterator(proj.apply(InternalRow.empty))
}
}
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
- protected override def doExecute(): RDD[InternalRow] = rdd
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ rdd.map { r =>
+ numOutputRows += 1
+ r
+ }
+ }
override def simpleString(maxFields: Int): String = s"$nodeName[]"
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 402365a59ece..f2e9121d566c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -107,6 +107,25 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
}
}
+ test("SPARK-54749: OneRowRelation metrics") {
+ Seq((1L, "false"), (2L, "true")).foreach { case (nodeId, enableWholeStage)
=>
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> enableWholeStage) {
+ val df = spark.sql("select 1 as c1")
+ val oneRowRelation = df.queryExecution.executedPlan.collect {
+ case oneRowRelation: OneRowRelationExec => oneRowRelation
+ }
+ df.collect()
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(oneRowRelation.size == 1)
+
+ val expected = Map("number of output rows" -> 1L)
+ testSparkPlanMetrics(df.toDF(), 1, Map(
+ nodeId -> (("Scan OneRowRelation", expected))))
+ }
+ }
+ }
+
+
test("Recursive CTEs metrics") {
withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "") {
val df = sql(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]