This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new c27b1e10f fix: enable native_datafusion Spark SQL tests previously
ignored in #3315 (#3696)
c27b1e10f is described below
commit c27b1e10f4ba63ba9088720049517a9a1bdcd0da
Author: Andy Grove <[email protected]>
AuthorDate: Sun Mar 15 07:59:52 2026 -0600
fix: enable native_datafusion Spark SQL tests previously ignored in #3315
(#3696)
Add numOutputRows metric alias to CometNativeScanExec so Spark's streaming
ProgressReporter can find input row counts. Remove
IgnoreCometNativeDataFusion
tags from three Spark SQL tests that now pass with native_datafusion scan.
---
dev/diffs/3.5.8.diff | 56 ++--------------------
.../spark/sql/comet/CometNativeScanExec.scala | 10 +++-
2 files changed, 13 insertions(+), 53 deletions(-)
diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index 3aaecdecb..08b69a6b8 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -965,7 +965,7 @@ index 3cf2bfd17ab..49728c35c42 100644
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-index fa1a64460fc..1d2e215d6a3 100644
+index fa1a64460fc..134f0db1fb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
@@ -17,6 +17,8 @@
@@ -1134,31 +1134,18 @@ index d269290e616..13726a31e07 100644
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-index cfc8b2cc845..b7c234e1437 100644
+index cfc8b2cc845..c4be7eb3731 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector
- import scala.collection.mutable.ArrayBuffer
-
+@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
--import org.apache.spark.sql.{AnalysisException, QueryTest}
-+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion,
QueryTest}
+ import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
Table, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with
SharedSparkSession {
- }
- }
-
-- test("Fallback Parquet V2 to V1") {
-+ test("Fallback Parquet V2 to V1",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315"))
{
- Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach {
format =>
- withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
- val commands = ArrayBuffer.empty[(String, LogicalPlan)]
-@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest
with SharedSparkSession {
+@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest
with SharedSparkSession {
val df = spark.read.format(format).load(path.getCanonicalPath)
checkAnswer(df, inputData.toDF())
assert(
@@ -2930,39 +2917,6 @@ index aad91601758..201083bd621 100644
})
}
-diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
-index b5cf13a9c12..ac17603fb7f 100644
----
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
-+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
-@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
-
- import org.apache.spark.{SparkException, TestUtils}
- import org.apache.spark.internal.Logging
--import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset,
Row, SaveMode}
-+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset,
IgnoreCometNativeDataFusion, Row, SaveMode}
- import org.apache.spark.sql.catalyst.InternalRow
- import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn,
Shuffle, Uuid}
- import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef,
CTERelationRef, LocalRelation}
-@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
- )
- }
-
-- test("SPARK-41198: input row calculation with CTE") {
-+ test("SPARK-41198: input row calculation with CTE",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315"))
{
- withTable("parquet_tbl", "parquet_streaming_tbl") {
- spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
- .write.format("parquet").saveAsTable("parquet_tbl")
-@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
- }
- }
-
-- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2
streaming sources") {
-+ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2
streaming sources",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315"))
{
- withTable("parquet_streaming_tbl") {
- val streamInput = MemoryStream[Int]
- val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS
value_stream")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
index 8f099c31e6b..ce4b7ad25b3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index 4e68a423a..909384c09 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -202,8 +202,14 @@ case class CometNativeScanExec(
override def hashCode(): Int = Objects.hashCode(originalPlan,
serializedPlanOpt)
- override lazy val metrics: Map[String, SQLMetric] =
- CometMetricNode.nativeScanMetrics(session.sparkContext)
+ override lazy val metrics: Map[String, SQLMetric] = {
+ val nativeMetrics = CometMetricNode.nativeScanMetrics(session.sparkContext)
+ // Map native metric names to Spark metric names
+ nativeMetrics.get("output_rows") match {
+ case Some(metric) => nativeMetrics + ("numOutputRows" -> metric)
+ case None => nativeMetrics
+ }
+ }
/**
* See [[org.apache.spark.sql.execution.DataSourceScanExec.inputRDDs]]. Only
used for tests.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]