This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 71f22f502 fix: enable more Spark SQL tests for `native_datafusion`
(`DynamicPartitionPruningSuite` / `ExplainSuite`) (#3694)
71f22f502 is described below
commit 71f22f502ca9f80eb34a5cc0fbd1d1b875c7846c
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 16 21:43:29 2026 -0600
fix: enable more Spark SQL tests for `native_datafusion`
(`DynamicPartitionPruningSuite` / `ExplainSuite`) (#3694)
---
dev/diffs/3.5.8.diff | 41 +++++++---------------
.../spark/sql/comet/CometNativeScanExec.scala | 40 ++++++++++++++++++++-
2 files changed, 51 insertions(+), 30 deletions(-)
diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index 3ed7d9ce1..619898e19 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -93,22 +93,23 @@ index 27ae10b3d59..78e69902dfd 100644
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
-index db587dd9868..aac7295a53d 100644
+index db587dd9868..33802f29253 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
-+import org.apache.spark.sql.comet.CometScanExec
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
-@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo {
+@@ -67,6 +68,8 @@ private[execution] object SparkPlanInfo {
// dump the file scan metadata (e.g file path) to event log
val metadata = plan match {
case fileScan: FileSourceScanExec => fileScan.metadata
+ case cometScan: CometScanExec => cometScan.metadata
++ case nativeScan: CometNativeScanExec => nativeScan.metadata
case _ => Map[String, String]()
}
new SparkPlanInfo(
@@ -396,14 +397,14 @@ index c4fb4fa943c..a04b23870a8 100644
assert(exchanges.size == 2)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..42eb9fd1cb7 100644
+index f33432ddb6f..4acdf7e9cfb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Expression}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
-+import org.apache.spark.sql.comet.CometScanExec
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog,
InMemoryTableWithV2FilterCatalog}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
@@ -447,40 +448,22 @@ index f33432ddb6f..42eb9fd1cb7 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
val df = sql(
""" WITH v as (
-@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
- * Check the static scan metrics with and without DPP
- */
- test("static scan metrics",
-- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
-+ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"))
{
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
- SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
-@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+@@ -1729,6 +1736,10 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
+ case s: CometScanExec =>
++ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
++ case s: CometNativeScanExec =>
+ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
case _ => false
}
assert(scanOption.isDefined)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-index a206e97c353..79813d8e259 100644
+index a206e97c353..fea1149b67d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
-@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
- }
- }
-
-- test("explain formatted - check presence of subquery in case of DPP") {
-+ test("explain formatted - check presence of subquery in case of DPP",
-+
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"))
{
- withTable("df1", "df2") {
- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
-@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
}
}
@@ -490,7 +473,7 @@ index a206e97c353..79813d8e259 100644
withTempDir { dir =>
Seq("parquet", "orc", "csv", "json").foreach { fmt =>
val basePath = dir.getCanonicalPath + "/" + fmt
-@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
}
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index 909384c09..dcb975ac7 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -78,6 +78,33 @@ case class CometNativeScanExec(
override val nodeName: String =
s"CometNativeScan $relation
${tableIdentifier.map(_.unquotedString).getOrElse("")}"
+ override def verboseStringWithOperatorId(): String = {
+ val metadataStr = metadata.toSeq.sorted
+ .filterNot {
+ case (_, value) if (value.isEmpty || value.equals("[]")) => true
+ case (key, _) if (key.equals("DataFilters") || key.equals("Format"))
=> true
+ case (_, _) => false
+ }
+ .map {
+ case (key, _) if (key.equals("Location")) =>
+ val location = relation.location
+ val numPaths = location.rootPaths.length
+ val abbreviatedLocation = if (numPaths <= 1) {
+ location.rootPaths.mkString("[", ", ", "]")
+ } else {
+ "[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
+ }
+ s"$key: ${location.getClass.getSimpleName}
${redact(abbreviatedLocation)}"
+ case (key, value) => s"$key: ${redact(value)}"
+ }
+
+ s"""
+ |$formattedNodeName
+ |${ExplainUtils.generateFieldString("Output", output)}
+ |${metadataStr.mkString("\n")}
+ |""".stripMargin
+ }
+
// exposed for testing
lazy val bucketedScan: Boolean = originalPlan.bucketedScan &&
!disableBucketedScan
@@ -202,13 +229,24 @@ case class CometNativeScanExec(
override def hashCode(): Int = Objects.hashCode(originalPlan,
serializedPlanOpt)
+ private val driverMetricKeys =
+ Set(
+ "numFiles",
+ "filesSize",
+ "numPartitions",
+ "metadataTime",
+ "staticFilesNum",
+ "staticFilesSize",
+ "pruningTime")
+
override lazy val metrics: Map[String, SQLMetric] = {
val nativeMetrics = CometMetricNode.nativeScanMetrics(session.sparkContext)
// Map native metric names to Spark metric names
- nativeMetrics.get("output_rows") match {
+ val withAlias = nativeMetrics.get("output_rows") match {
case Some(metric) => nativeMetrics + ("numOutputRows" -> metric)
case None => nativeMetrics
}
+ withAlias ++ scan.metrics.filterKeys(driverMetricKeys)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]