This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new c27b1e10f fix: enable native_datafusion Spark SQL tests previously 
ignored in #3315 (#3696)
c27b1e10f is described below

commit c27b1e10f4ba63ba9088720049517a9a1bdcd0da
Author: Andy Grove <[email protected]>
AuthorDate: Sun Mar 15 07:59:52 2026 -0600

    fix: enable native_datafusion Spark SQL tests previously ignored in #3315 
(#3696)
    
    Add numOutputRows metric alias to CometNativeScanExec so Spark's streaming
    ProgressReporter can find input row counts. Remove 
IgnoreCometNativeDataFusion
    tags from three Spark SQL tests that now pass with native_datafusion scan.
---
 dev/diffs/3.5.8.diff                               | 56 ++--------------------
 .../spark/sql/comet/CometNativeScanExec.scala      | 10 +++-
 2 files changed, 13 insertions(+), 53 deletions(-)

diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index 3aaecdecb..08b69a6b8 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -965,7 +965,7 @@ index 3cf2bfd17ab..49728c35c42 100644
        SQLConf.ANSI_ENABLED.key -> "true") {
        withTable("t") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-index fa1a64460fc..1d2e215d6a3 100644
+index fa1a64460fc..134f0db1fb8 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
 @@ -17,6 +17,8 @@
@@ -1134,31 +1134,18 @@ index d269290e616..13726a31e07 100644
                }
              }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-index cfc8b2cc845..b7c234e1437 100644
+index cfc8b2cc845..c4be7eb3731 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector
- import scala.collection.mutable.ArrayBuffer
- 
+@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
  import org.apache.spark.SparkConf
--import org.apache.spark.sql.{AnalysisException, QueryTest}
-+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, 
QueryTest}
+ import org.apache.spark.sql.{AnalysisException, QueryTest}
  import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
  import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
Table, TableCapability}
  import org.apache.spark.sql.connector.read.ScanBuilder
  import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with 
SharedSparkSession {
-     }
-   }
- 
--  test("Fallback Parquet V2 to V1") {
-+  test("Fallback Parquet V2 to V1",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315";))
 {
-     Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { 
format =>
-       withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
-         val commands = ArrayBuffer.empty[(String, LogicalPlan)]
-@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest 
with SharedSparkSession {
+@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest 
with SharedSparkSession {
              val df = spark.read.format(format).load(path.getCanonicalPath)
              checkAnswer(df, inputData.toDF())
              assert(
@@ -2930,39 +2917,6 @@ index aad91601758..201083bd621 100644
        })
    }
  
-diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
-index b5cf13a9c12..ac17603fb7f 100644
---- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
-+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
-@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
- 
- import org.apache.spark.{SparkException, TestUtils}
- import org.apache.spark.internal.Logging
--import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
Row, SaveMode}
-+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
IgnoreCometNativeDataFusion, Row, SaveMode}
- import org.apache.spark.sql.catalyst.InternalRow
- import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, 
Shuffle, Uuid}
- import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, 
CTERelationRef, LocalRelation}
-@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
-     )
-   }
- 
--  test("SPARK-41198: input row calculation with CTE") {
-+  test("SPARK-41198: input row calculation with CTE",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315";))
 {
-     withTable("parquet_tbl", "parquet_streaming_tbl") {
-       spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
-         .write.format("parquet").saveAsTable("parquet_tbl")
-@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
-     }
-   }
- 
--  test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 
streaming sources") {
-+  test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 
streaming sources",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315";))
 {
-     withTable("parquet_streaming_tbl") {
-       val streamInput = MemoryStream[Int]
-       val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS 
value_stream")
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
 index 8f099c31e6b..ce4b7ad25b3 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index 4e68a423a..909384c09 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -202,8 +202,14 @@ case class CometNativeScanExec(
 
   override def hashCode(): Int = Objects.hashCode(originalPlan, 
serializedPlanOpt)
 
-  override lazy val metrics: Map[String, SQLMetric] =
-    CometMetricNode.nativeScanMetrics(session.sparkContext)
+  override lazy val metrics: Map[String, SQLMetric] = {
+    val nativeMetrics = CometMetricNode.nativeScanMetrics(session.sparkContext)
+    // Map native metric names to Spark metric names
+    nativeMetrics.get("output_rows") match {
+      case Some(metric) => nativeMetrics + ("numOutputRows" -> metric)
+      case None => nativeMetrics
+    }
+  }
 
   /**
    * See [[org.apache.spark.sql.execution.DataSourceScanExec.inputRDDs]]. Only 
used for tests.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to