This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 1d6665ae8e7d [SPARK-54749][SQL] Fix incorrect numOutputRows metric in 
OneRowRelationExec
1d6665ae8e7d is described below

commit 1d6665ae8e7d6caac75e2b3ebb6588b4788f2727
Author: Fu Chen <[email protected]>
AuthorDate: Thu Dec 18 21:07:33 2025 +0800

    [SPARK-54749][SQL] Fix incorrect numOutputRows metric in OneRowRelationExec
    
    ### What changes were proposed in this pull request?
    
    This PR moves the `numOutputRows` metric update logic from the val rdd 
initialization block to the `doExecute()` method in `OneRowRelationExec`.
    
    ### Why are the changes needed?
    
    Currently, the `numOutputRows` metric in `OneRowRelationExec` is 
incorrectly incremented twice in the codegen codebase (displaying 2 instead of 
1 for a single row).
    
    before this PR:
    
    <img width="251" height="318" 
alt="企业微信截图_70a269d7-7e34-40a4-a0f4-abeae7283a79" 
src="https://github.com/user-attachments/assets/4a376f45-e89d-4fa6-8ba9-a564663467cf";
 />
    
    after this PR:
    
    <img width="242" height="314" 
alt="企业微信截图_d7fab096-024e-440b-94e1-707296258c55" 
src="https://github.com/user-attachments/assets/036de957-9ba9-4b7d-a89e-ed737d23eae2";
 />
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, only bug fix.
    
    ### How was this patch tested?
    
    Added UT.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #53520 from cfmcgrady/fix-onerowrelation-metrics.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit a26c20953bb3262ce4da1f609457d7525e37af1a)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/execution/ExistingRDD.scala  | 14 ++++++++------
 .../spark/sql/execution/metric/SQLMetricsSuite.scala  | 19 +++++++++++++++++++
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 6148fb30783e..06085497de19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -360,23 +360,25 @@ case class OneRowRelationExec() extends LeafExecNode
   override val output: Seq[Attribute] = Nil
 
   private val rdd: RDD[InternalRow] = {
-    val numOutputRows = longMetric("numOutputRows")
     session
       .sparkContext
       .parallelize(Seq(""), 1)
       .mapPartitionsInternal { _ =>
         val proj = UnsafeProjection.create(Seq.empty[Expression])
-        Iterator(proj.apply(InternalRow.empty)).map { r =>
-          numOutputRows += 1
-          r
-        }
+        Iterator(proj.apply(InternalRow.empty))
       }
   }
 
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
-  protected override def doExecute(): RDD[InternalRow] = rdd
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    rdd.map { r =>
+      numOutputRows += 1
+      r
+    }
+  }
 
   override def simpleString(maxFields: Int): String = s"$nodeName[]"
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 402365a59ece..f2e9121d566c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -107,6 +107,25 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
     }
   }
 
+  test("SPARK-54749: OneRowRelation metrics") {
+    Seq((1L, "false"), (2L, "true")).foreach { case (nodeId, enableWholeStage) 
=>
+      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> enableWholeStage) {
+        val df = spark.sql("select 1 as c1")
+        val oneRowRelation = df.queryExecution.executedPlan.collect {
+          case oneRowRelation: OneRowRelationExec => oneRowRelation
+        }
+        df.collect()
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(oneRowRelation.size == 1)
+
+        val expected = Map("number of output rows" -> 1L)
+        testSparkPlanMetrics(df.toDF(), 1, Map(
+          nodeId -> (("Scan OneRowRelation", expected))))
+      }
+    }
+  }
+
+
   test("Recursive CTEs metrics") {
     withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "") {
       val df = sql(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to