andygrove commented on code in PR #4093:
URL: https://github.com/apache/datafusion-comet/pull/4093#discussion_r3174885663
##########
dev/diffs/4.1.1.diff:
##########
@@ -0,0 +1,4326 @@
+diff --git
a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+index 6df8bc85b51..dabb75e2b75 100644
+--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
++++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+@@ -268,6 +268,11 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
+ }
+
+ test("Upload from all decommissioned executors") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is
incompatible with
++ // the fallback storage migration path used by BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native shuffle
storage")
+ sc = new SparkContext(getSparkConf(2, 2))
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+@@ -298,6 +303,11 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
+ }
+
+ test("Upload multi stages") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is
incompatible with
++ // the fallback storage migration path used by BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native shuffle
storage")
+ sc = new SparkContext(getSparkConf())
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+@@ -332,6 +342,11 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
+
+ CompressionCodec.shortCompressionCodecNames.keys.foreach { codec =>
+ test(s"$codec - Newly added executors should access old data from remote
storage") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is
incompatible with
++ // the fallback storage migration path used by
BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native
shuffle storage")
+ sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC,
codec))
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+diff --git a/pom.xml b/pom.xml
+index dc757d78812..18841e95f3d 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -152,6 +152,8 @@
+ <kryo.version>4.0.3</kryo.version>
+ <ivy.version>2.5.3</ivy.version>
+ <oro.version>2.0.8</oro.version>
++ <spark.version.short>4.1</spark.version.short>
++ <comet.version>0.16.0-SNAPSHOT</comet.version>
+ <!--
+ If you change codahale.metrics.version, you also need to change
+ the link to metrics.dropwizard.io in docs/monitoring.md.
+@@ -2594,6 +2596,25 @@
+ <artifactId>arpack</artifactId>
+ <version>${netlib.ludovic.dev.version}</version>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.datafusion</groupId>
++
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++ <version>${comet.version}</version>
++ <exclusions>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-sql_${scala.binary.version}</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-core_${scala.binary.version}</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
++ </exclusion>
++ </exclusions>
++ </dependency>
+ <!-- SPARK-16484 add `datasketches-java` for support Datasketches
HllSketch -->
+ <dependency>
+ <groupId>org.apache.datasketches</groupId>
+diff --git a/sql/core/pom.xml b/sql/core/pom.xml
+index d2d07a08aa9..d89f80e5b68 100644
+--- a/sql/core/pom.xml
++++ b/sql/core/pom.xml
+@@ -97,6 +97,10 @@
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.datafusion</groupId>
++
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++ </dependency>
+
+ <!--
+ This spark-tags test-dep is needed even though it isn't used in this
module, otherwise testing-cmds that exclude
+diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+index c47e8454162..758d80a0bcb 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+@@ -1218,6 +1218,23 @@ object SparkSession extends SparkSessionCompanion with
Logging {
+ extensions
+ }
+
++ /**
++ * Whether Comet extension is enabled
++ */
++ def isCometEnabled: Boolean = {
++ val v = System.getenv("ENABLE_COMET")
++ v == null || v == "1" || v.toBoolean
++ }
++
++
++ private def loadCometExtension(sparkContext: SparkContext): Seq[String] = {
++ if (sparkContext.getConf.getBoolean("spark.comet.enabled",
isCometEnabled)) {
++ Seq("org.apache.comet.CometSparkSessionExtensions")
++ } else {
++ Seq.empty
++ }
++ }
++
+ /**
+ * Initialize extensions specified in [[StaticSQLConf]]. The classes will
be applied to the
+ * extensions passed into this function.
+@@ -1227,7 +1244,8 @@ object SparkSession extends SparkSessionCompanion with
Logging {
+ extensions: SparkSessionExtensions): SparkSessionExtensions = {
+ val extensionConfClassNames =
sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
+ .getOrElse(Seq.empty)
+- extensionConfClassNames.foreach { extensionConfClassName =>
++ val extensionClassNames = extensionConfClassNames ++
loadCometExtension(sparkContext)
++ extensionClassNames.foreach { extensionConfClassName =>
+ try {
+ val extensionConfClass = Utils.classForName(extensionConfClassName)
+ val extensionConf = extensionConfClass.getConstructor().newInstance()
+diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+index 4410fe50912..43bcce2a038 100644
+---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
++++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
+
+ import org.apache.spark.annotation.DeveloperApi
+ import org.apache.spark.sql.catalyst.plans.logical.{EmptyRelation,
LogicalPlan}
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
+ import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
+ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+@@ -84,6 +85,7 @@ private[execution] object SparkPlanInfo {
+ // dump the file scan metadata (e.g file path) to event log
+ val metadata = plan match {
+ case fileScan: FileSourceScanLike => fileScan.metadata
++ case cometScan: CometScanExec => cometScan.metadata
+ case _ => Map[String, String]()
+ }
+ val childrenInfo = children.flatMap {
+diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
+index 69b4001ff34..6fda691652d 100644
+---
a/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
++++
b/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
+@@ -1,7 +1,7 @@
+ -- Automatically generated by SQLQueryTestSuite
+ -- !query
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+- (1, 2),
++ (1, 2),
+ (1, 2),
+ (1, 3),
+ (1, 3),
+@@ -11,7 +11,7 @@ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+ AS tab1(k, v)
+ -- !query analysis
+ CreateViewCommand `tab1`, SELECT * FROM VALUES
+- (1, 2),
++ (1, 2),
+ (1, 2),
+ (1, 3),
+ (1, 3),
+@@ -26,8 +26,8 @@ CreateViewCommand `tab1`, SELECT * FROM VALUES
+
+ -- !query
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+- (1, 2),
+- (1, 2),
++ (1, 2),
++ (1, 2),
+ (2, 3),
+ (3, 4),
+ (null, null),
+@@ -35,8 +35,8 @@ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+ AS tab2(k, v)
+ -- !query analysis
+ CreateViewCommand `tab2`, SELECT * FROM VALUES
+- (1, 2),
+- (1, 2),
++ (1, 2),
++ (1, 2),
+ (2, 3),
+ (3, 4),
+ (null, null),
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
b/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
+index 13bbd9d81b7..541cdfb1e04 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
+@@ -15,6 +15,12 @@
+ -- limitations under the License.
+ --
+
++-- TODO: Disabled due to one of the test failed for Spark4.0
++-- TODO: https://github.com/apache/datafusion-comet/issues/1948
++-- The following query failed
++-- select /*+ COALESCE(1) */ id, a+b, a-b, a*b, a/b from decimals_test order
by id
++--SET spark.comet.enabled = false
++
+ CREATE TEMPORARY VIEW t AS SELECT 1.0 as a, 0.0 as b;
+
+ -- division, remainder and pmod by 0 return NULL
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
+index e28f0721a64..788b43c242a 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
+@@ -1,3 +1,7 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4122)
++-- EXCEPT ALL with GROUP BY returns incorrect results on Spark 4.1
++--SET spark.comet.enabled = false
++
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+ (0), (1), (2), (2), (2), (2), (3), (null), (null) AS tab1(c1);
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+index 7aef901da4f..f3d6e18926d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+@@ -2,3 +2,4 @@
+
+ --SET spark.sql.adaptive.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+index eeb2180f7a5..afd1b5ec289 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+@@ -1,5 +1,6 @@
+ --SET spark.sql.cbo.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+
+ CREATE TABLE explain_temp1(a INT, b INT) USING PARQUET;
+ CREATE TABLE explain_temp2(c INT, d INT) USING PARQUET;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+index 96dddafd82a..efedbbb3c0e 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+@@ -1,6 +1,7 @@
+ --SET spark.sql.codegen.wholeStage = true
+ --SET spark.sql.adaptive.enabled = false
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+
+ -- Test tables
+ CREATE table explain_temp1 (key int, val int) USING PARQUET;
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
b/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
+index 1f53ca359fe..0f572f7c6ce 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
+@@ -1,3 +1,7 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4123)
++-- Comet native sort lacks row-format support for Struct(Map(...)) sort keys
++--SET spark.comet.enabled = false
++
+ -- This test file contains queries that test recursive types name resolution
in ORDER BY and HAVING clauses.
+
+ -- Alias type: String, Table column type: Struct
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/hll.sql
b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
+index 35128da97fd..25b873ae859 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/hll.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
+@@ -1,3 +1,8 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4121)
++-- Comet's native scan rejects invalid UTF-8 byte sequences inserted into the
++-- string test table, which Spark allows.
++--SET spark.comet.enabled = false
++
+ -- Positive test cases
+ -- Create a table with some testing data.
+ DROP TABLE IF EXISTS t1;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
+index 077caa5dd44..697457d4251 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
+@@ -1,5 +1,9 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4122)
++-- INTERSECT ALL with GROUP BY returns incorrect results on Spark 4.1
++--SET spark.comet.enabled = false
++
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+- (1, 2),
++ (1, 2),
+ (1, 2),
+ (1, 3),
+ (1, 3),
+@@ -8,8 +12,8 @@ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+ (null, null)
+ AS tab1(k, v);
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+- (1, 2),
+- (1, 2),
++ (1, 2),
++ (1, 2),
+ (2, 3),
+ (3, 4),
+ (null, null),
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+index 41fd4de2a09..162d5a817b6 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+@@ -6,6 +6,10 @@
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/aggregates.sql#L352-L605
+
+ -- Test aggregate operator with codegen on and off.
++
++-- Floating-point precision difference between DataFusion and JVM for FILTER
aggregates
++--SET spark.comet.enabled = false
++
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+ --CONFIG_DIM1
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
+ --CONFIG_DIM1
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+index 3a409eea348..26e9aaf215c 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+@@ -6,6 +6,9 @@
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int4.sql
+ --
+
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ CREATE TABLE INT4_TBL(f1 int) USING parquet;
+
+ -- [SPARK-28023] Trim the string when cast string type to other types
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+index fac23b4a26f..98b12ae5ccc 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+@@ -6,6 +6,10 @@
+ -- Test int8 64-bit integers.
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int8.sql
+ --
++
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ CREATE TABLE INT8_TBL(q1 bigint, q2 bigint) USING parquet;
+
+ -- PostgreSQL implicitly casts string literals to data with integral types,
but
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+index 0efe0877e9b..f9df0400c99 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+@@ -6,6 +6,9 @@
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
+ --
+
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ -- load test data
+ CREATE TABLE test_having (a int, b int, c string, d string) USING parquet;
+ INSERT INTO test_having VALUES (0, 1, 'XXXX', 'A');
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
+index 7c816d8a416..b1551a2b296 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
+@@ -1,6 +1,23 @@
+ -- A test suite for IN LIMIT in parent side, subquery, and both predicate
subquery
+ -- It includes correlated cases.
+
++-- TODO: Disabled due to one of the test failed for Spark4.0
++-- TODO: https://github.com/apache/datafusion-comet/issues/1948
++-- The following query failed
++-- SELECT Count(DISTINCT( t1a )),
++-- t1b
++-- FROM t1
++-- WHERE t1d NOT IN (SELECT t2d
++-- FROM t2
++-- WHERE t2b > t1b
++-- ORDER BY t2b DESC nulls first, t2d
++-- LIMIT 1
++-- OFFSET 1)
++-- GROUP BY t1b
++-- ORDER BY t1b NULLS last
++-- LIMIT 1
++-- OFFSET 1;
++--SET spark.comet.enabled = false
+ --CONFIG_DIM1 spark.sql.optimizeNullAwareAntiJoin=true
+ --CONFIG_DIM1 spark.sql.optimizeNullAwareAntiJoin=false
+
+@@ -61,6 +78,7 @@ WHERE t1a IN (SELECT t2a
+ WHERE t1d = t2d)
+ LIMIT 2;
+
++--SET spark.sql.cbo.enabled=true
+ -- correlated IN subquery
+ -- LIMIT on both parent and subquery sides
+ SELECT *
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+index e803254ea64..74db78aee38 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+@@ -1,6 +1,9 @@
+ -- This test suits check the spark.sql.viewSchemaBindingMode configuration.
+ -- It can be DISABLED and COMPENSATION
+
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ -- Verify the default binding is true
+ SET spark.sql.legacy.viewSchemaBindingMode;
+
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+index 21a3ce1e122..f4762ab98f0 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+@@ -1,5 +1,9 @@
+ -- This test suite checks the WITH SCHEMA COMPENSATION clause
+ -- Disable ANSI mode to ensure we are forcing it explicitly in the CASTS
++
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ SET spark.sql.ansi.enabled = false;
+
+ -- In COMPENSATION views get invalidated if the type can't cast
+diff --git
a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
+index 44f95f225ab..361866fc298 100644
+--- a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
++++ b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
+@@ -1,7 +1,7 @@
+ -- Automatically generated by SQLQueryTestSuite
+ -- !query
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+- (1, 2),
++ (1, 2),
+ (1, 2),
+ (1, 3),
+ (1, 3),
+@@ -17,8 +17,8 @@ struct<>
+
+ -- !query
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+- (1, 2),
+- (1, 2),
++ (1, 2),
++ (1, 2),
+ (2, 3),
+ (3, 4),
+ (null, null),
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+index 0d807aeae4d..6d7744e771b 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+@@ -49,7 +49,7 @@ import
org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEProp
+ import org.apache.spark.sql.execution.columnar._
+ import org.apache.spark.sql.execution.command.CommandUtils
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -534,7 +534,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
+ df.collect()
+ }
+ assert(
+- collect(df.queryExecution.executedPlan) { case e: ShuffleExchangeExec
=> e }.size == expected)
++ collect(df.queryExecution.executedPlan) {
++ case _: ShuffleExchangeLike => 1 }.size == expected)
+ }
+
+ test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
+@@ -1673,9 +1674,18 @@ class CachedTableSuite extends QueryTest with
SQLTestUtils
+ _.nodeName.contains("TableCacheQueryStage"))
+ val aqeNode = findNodeInSparkPlanInfo(inMemoryScanNode.get,
+ _.nodeName.contains("AdaptiveSparkPlan"))
+- val aqePlanRoot = findNodeInSparkPlanInfo(inMemoryScanNode.get,
+- _.nodeName.contains("ResultQueryStage"))
+- aqePlanRoot.get.children.head.nodeName == "AQEShuffleRead"
++ // Spark 4.0 wraps results in ResultQueryStage. The coalescing
indicator is AQEShuffleRead
++ // as the direct child of InputAdapter.
++ // AdaptiveSparkPlan -> ResultQueryStage -> WholestageCodegen ->
++ // CometColumnarToRow -> InputAdapter -> AQEShuffleRead (if
coalesced)
++ val resultStage = aqeNode.get.children.head // ResultQueryStage
++ val wsc = resultStage.children.head // WholeStageCodegen
++ val c2r = wsc.children.head // ColumnarToRow or
CometColumnarToRow
++ val inputAdapter = c2r.children.head // InputAdapter
++ resultStage.nodeName == "ResultQueryStage" &&
++ wsc.nodeName.startsWith("WholeStageCodegen") && // could be
"WholeStageCodegen (1)"
++ (c2r.nodeName == "CometColumnarToRow" || c2r.nodeName ==
"ColumnarToRow") &&
++ inputAdapter.children.head.nodeName == "AQEShuffleRead"
+ }
+
+ withTempView("t0", "t1", "t2") {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+index 0dfd37ebeae..66340218c7c 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+@@ -31,7 +31,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
+ import org.apache.spark.sql.execution.WholeStageCodegenExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import org.apache.spark.sql.expressions.Window
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -856,7 +856,7 @@ class DataFrameAggregateSuite extends QueryTest
+ assert(objHashAggPlans.nonEmpty)
+
+ val exchangePlans = collect(aggPlan) {
+- case shuffle: ShuffleExchangeExec => shuffle
++ case shuffle: ShuffleExchangeLike => shuffle
+ }
+ assert(exchangePlans.length == 1)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+index ed182322aec..1ae6afa686a 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+@@ -435,7 +435,9 @@ class DataFrameJoinSuite extends QueryTest
+
+ withTempDatabase { dbName =>
+ withTable(table1Name, table2Name) {
+- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
++ withSQLConf(
++ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
++ "spark.comet.enabled" -> "false") {
+ spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+ spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+index 93ff7becaec..7b2871cc656 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql
+ import java.sql.{Date, Timestamp}
+ import java.util.Locale
+
++import org.apache.spark.sql.IgnoreComet
+ import org.apache.spark.sql.catalyst.optimizer.RemoveNoopUnion
+ import org.apache.spark.sql.catalyst.plans.logical.Union
+ import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
+@@ -1511,7 +1512,8 @@ class DataFrameSetOperationsSuite extends QueryTest
+ }
+ }
+
+- test("SPARK-52921: union partitioning - reused shuffle") {
++ test("SPARK-52921: union partitioning - reused shuffle",
++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a",
"b", "c")
+ val df2 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a",
"b", "c")
+@@ -1538,7 +1540,8 @@ class DataFrameSetOperationsSuite extends QueryTest
+ }
+ }
+
+- test("SPARK-52921: union partitioning - semantic equality") {
++ test("SPARK-52921: union partitioning - semantic equality",
++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) {
+ val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b",
"c")
+ val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e",
"f")
+
+@@ -1589,7 +1592,8 @@ class DataFrameSetOperationsSuite extends QueryTest
+ }
+ }
+
+- test("SPARK-52921: union partitioning - range partitioning") {
++ test("SPARK-52921: union partitioning - range partitioning",
++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) {
+ val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b",
"c")
+ val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e",
"f")
+
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+index 5b88eeefeca..d4f07bc182a 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+@@ -36,11 +36,12 @@ import
org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
+ import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+ import org.apache.spark.sql.catalyst.parser.ParseException
+ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode,
LocalRelation, LogicalPlan, OneRowRelation}
++import org.apache.spark.sql.comet.CometBroadcastExchangeExec
+ import org.apache.spark.sql.connector.FakeV2Provider
+ import org.apache.spark.sql.execution.{FilterExec, LogicalRDD,
QueryExecution, SortExec, WholeStageCodegenExec}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike}
++import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.expressions.{Aggregator, Window}
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -1493,7 +1494,7 @@ class DataFrameSuite extends QueryTest
+ fail("Should not have back to back Aggregates")
+ }
+ atFirstAgg = true
+- case e: ShuffleExchangeExec => atFirstAgg = false
++ case e: ShuffleExchangeLike => atFirstAgg = false
+ case _ =>
+ }
+ }
+@@ -1683,7 +1684,7 @@ class DataFrameSuite extends QueryTest
+ checkAnswer(join, df)
+ assert(
+ collect(join.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => true }.size === 1)
++ case _: ShuffleExchangeLike => true }.size === 1)
+ assert(
+ collect(join.queryExecution.executedPlan) { case e:
ReusedExchangeExec => true }.size === 1)
+ val broadcasted = broadcast(join)
+@@ -1691,10 +1692,12 @@ class DataFrameSuite extends QueryTest
+ checkAnswer(join2, df)
+ assert(
+ collect(join2.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => true }.size == 1)
++ case _: ShuffleExchangeLike => true }.size == 1)
+ assert(
+ collect(join2.queryExecution.executedPlan) {
+- case e: BroadcastExchangeExec => true }.size === 1)
++ case e: BroadcastExchangeExec => true
++ case _: CometBroadcastExchangeExec => true
++ }.size === 1)
+ assert(
+ collect(join2.queryExecution.executedPlan) { case e:
ReusedExchangeExec => true }.size == 4)
+ }
+@@ -2092,7 +2095,7 @@ class DataFrameSuite extends QueryTest
+
+ // Assert that no extra shuffle introduced by cogroup.
+ val exchanges = collect(df3.queryExecution.executedPlan) {
+- case h: ShuffleExchangeExec => h
++ case h: ShuffleExchangeLike => h
+ }
+ assert(exchanges.size == 2)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 6e9f3385571..15615f3e417 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator,
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1142,10 +1143,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ }
+
+ def isShuffleExecByRequirement(
+- plan: ShuffleExchangeExec,
++ plan: ShuffleExchangeLike,
+ desiredClusterColumns: Seq[String]): Boolean = plan match {
+ case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS,
_) =>
+ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++ case CometShuffleExchangeExec(op: HashPartitioning, _, _,
ENSURE_REQUIREMENTS, _, _) =>
++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+ case _ => false
+ }
+
+@@ -1168,7 +1171,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+ case w: WindowExec =>
+ w.child.exists {
+- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
+ case _ => false
+ }
+ case _ => false
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+index 6df8d66ee7f..35e270c7241 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+@@ -46,7 +46,7 @@ import
org.apache.spark.sql.catalyst.trees.DataFrameQueryContext
+ import org.apache.spark.sql.catalyst.util.sideBySide
+ import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+ import org.apache.spark.sql.expressions.UserDefinedFunction
+ import org.apache.spark.sql.functions._
+@@ -2420,7 +2420,7 @@ class DatasetSuite extends QueryTest
+
+ // Assert that no extra shuffle introduced by cogroup.
+ val exchanges = collect(df3.queryExecution.executedPlan) {
+- case h: ShuffleExchangeExec => h
++ case h: ShuffleExchangeLike => h
+ }
+ assert(exchanges.size == 2)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+index e1a2fd33c7c..9a93daa8f5a 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
+ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Expression}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
+ import org.apache.spark.sql.catalyst.plans.ExistenceJoin
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec,
CometSubqueryBroadcastExec}
+ import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog,
InMemoryTableWithV2FilterCatalog}
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive._
+@@ -193,6 +194,7 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ val subqueryBroadcast = dpExprs.collect {
+ case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) => b
++ case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) => b
+ }
+
+ val hasFilter = if (withSubquery) "Should" else "Shouldn't"
+@@ -247,6 +249,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ val buf =
collectDynamicPruningExpressions(df.queryExecution.executedPlan).collect {
+ case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) =>
+ b.indices.map(idx => b.buildKeys(idx))
++ case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) =>
++ b.indices.map(idx => b.buildKeys(idx))
+ }
+ assert(buf.distinct.size == n)
+ }
+@@ -262,6 +266,12 @@ abstract class DynamicPartitionPruningSuiteBase
+ case s: BatchScanExec => s.runtimeFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
++ case s: CometScanExec => s.partitionFilters.collect {
++ case d: DynamicPruningExpression => d.child
++ }
++ case s: CometNativeScanExec => s.partitionFilters.collect {
++ case d: DynamicPruningExpression => d.child
++ }
+ case _ => Nil
+ }
+ }
+@@ -1204,10 +1214,16 @@ abstract class DynamicPartitionPruningSuiteBase
+
+ val plan = df.queryExecution.executedPlan
+ val countSubqueryBroadcasts =
+- collectWithSubqueries(plan)({ case _: SubqueryBroadcastExec => 1
}).sum
++ collectWithSubqueries(plan)({
++ case _: SubqueryBroadcastExec => 1
++ case _: CometSubqueryBroadcastExec => 1
++ }).sum
+
+ val countReusedSubqueryBroadcasts =
+- collectWithSubqueries(plan)({ case ReusedSubqueryExec(_:
SubqueryBroadcastExec) => 1}).sum
++ collectWithSubqueries(plan)({
++ case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1
++ case ReusedSubqueryExec(_: CometSubqueryBroadcastExec) => 1
++ }).sum
+
+ assert(countSubqueryBroadcasts == 1)
+ assert(countReusedSubqueryBroadcasts == 1)
+@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+
+ test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
+- "canonicalization and exchange reuse") {
++ "canonicalization and exchange reuse",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/4045")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.V2_BUCKETING_ENABLED.key -> "false") {
+@@ -1331,6 +1348,7 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+
+ test("Subquery reuse across the whole plan",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"),
+ DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+@@ -1425,7 +1443,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++ test("SPARK-34637: DPP side broadcast query stage is created firstly",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/4045")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ val df = sql(
+ """ WITH v as (
+@@ -1579,6 +1598,7 @@ abstract class DynamicPartitionPruningSuiteBase
+
+ val subqueryBroadcastExecs =
collectWithSubqueries(df.queryExecution.executedPlan) {
+ case s: SubqueryBroadcastExec => s
++ case s: CometSubqueryBroadcastExec => s
+ }
+ assert(subqueryBroadcastExecs.size === 1)
+ subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
+@@ -1731,6 +1751,10 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+ case s: BatchScanExec =>
+ // we use f1 col for v2 tables due to schema pruning
+ s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
++ case s: CometScanExec =>
++ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
++ case s: CometNativeScanExec =>
++ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
+ case _ => false
+ }
+ assert(scanOption.isDefined)
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+index b27122a8de2..a4c5aac8212 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+@@ -470,7 +470,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+ }
+ }
+
+- test("Explain formatted output for scan operator for datasource V2") {
++ test("Explain formatted output for scan operator for datasource V2",
++ IgnoreComet("Comet explain output is different")) {
+ withTempDir { dir =>
+ Seq("parquet", "orc", "csv", "json").foreach { fmt =>
+ val basePath = dir.getCanonicalPath + "/" + fmt
+@@ -548,7 +549,9 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+ }
+ }
+
+-class ExplainSuiteAE extends ExplainSuiteHelper with
EnableAdaptiveExecutionSuite {
++// Ignored when Comet is enabled. Comet changes expected query plans.
++class ExplainSuiteAE extends ExplainSuiteHelper with
EnableAdaptiveExecutionSuite
++ with IgnoreCometSuite {
+ import testImplicits._
+
+ test("SPARK-35884: Explain Formatted") {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+index 95e86fe4311..0f7ed3271d4 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+@@ -33,6 +33,8 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
+ import
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
positiveInt}
+ import org.apache.spark.sql.catalyst.plans.logical.Filter
+ import org.apache.spark.sql.catalyst.types.DataTypeUtils
++import org.apache.spark.sql.catalyst.util.quietly
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec,
CometScanExec, CometSortMergeJoinExec}
+ import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.FilePartition
+@@ -204,7 +206,11 @@ class FileBasedDataSourceSuite extends QueryTest
+ }
+
+ allFileBasedDataSources.foreach { format =>
+- testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
++ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") {
++ Seq(IgnoreCometNativeDataFusion(
++ "https://github.com/apache/datafusion-comet/issues/3314"))
++ } else Seq.empty
++ test(s"Enabling/disabling ignoreMissingFiles using $format",
ignoreMissingTags: _*) { quietly {
+ def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+@@ -264,7 +270,7 @@ class FileBasedDataSourceSuite extends QueryTest
+ }
+ }
+ }
+- }
++ }}
+ }
+
+ Seq("json", "orc").foreach { format =>
+@@ -655,18 +661,25 @@ class FileBasedDataSourceSuite extends QueryTest
+ checkAnswer(sql(s"select A from $tableName"), data.select("A"))
+
+ // RuntimeException is triggered at executor side, which is then
wrapped as
+- // SparkException at driver side
++ // SparkException at driver side. Comet native readers throw
++ // SparkRuntimeException directly without the SparkException
wrapper.
++ def getDuplicateFieldError(query: String): SparkRuntimeException
= {
++ try {
++ sql(query).collect()
++ fail("Expected an
exception").asInstanceOf[SparkRuntimeException]
++ } catch {
++ case e: SparkException =>
++ e.getCause.asInstanceOf[SparkRuntimeException]
++ case e: SparkRuntimeException => e
++ }
++ }
+ checkError(
+- exception = intercept[SparkException] {
+- sql(s"select b from $tableName").collect()
+- }.getCause.asInstanceOf[SparkRuntimeException],
++ exception = getDuplicateFieldError(s"select b from $tableName"),
+ condition = "_LEGACY_ERROR_TEMP_2093",
+ parameters = Map("requiredFieldName" -> "b", "matchedOrcFields"
-> "[b, B]")
+ )
+ checkError(
+- exception = intercept[SparkException] {
+- sql(s"select B from $tableName").collect()
+- }.getCause.asInstanceOf[SparkRuntimeException],
++ exception = getDuplicateFieldError(s"select B from $tableName"),
+ condition = "_LEGACY_ERROR_TEMP_2093",
+ parameters = Map("requiredFieldName" -> "b", "matchedOrcFields"
-> "[b, B]")
+ )
+@@ -954,6 +967,7 @@ class FileBasedDataSourceSuite extends QueryTest
+ assert(bJoinExec.isEmpty)
+ val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
+ case smJoin: SortMergeJoinExec => smJoin
++ case smJoin: CometSortMergeJoinExec => smJoin
+ }
+ assert(smJoinExec.nonEmpty)
+ }
+@@ -1014,6 +1028,7 @@ class FileBasedDataSourceSuite extends QueryTest
+
+ val fileScan = df.queryExecution.executedPlan collectFirst {
+ case BatchScanExec(_, f: FileScan, _, _, _, _) => f
++ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _,
_), _, _) => f
+ }
+ assert(fileScan.nonEmpty)
+ assert(fileScan.get.partitionFilters.nonEmpty)
+@@ -1055,6 +1070,7 @@ class FileBasedDataSourceSuite extends QueryTest
+
+ val fileScan = df.queryExecution.executedPlan collectFirst {
+ case BatchScanExec(_, f: FileScan, _, _, _, _) => f
++ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _,
_), _, _) => f
+ }
+ assert(fileScan.nonEmpty)
+ assert(fileScan.get.partitionFilters.isEmpty)
+@@ -1239,6 +1255,9 @@ class FileBasedDataSourceSuite extends QueryTest
+ val filters = df.queryExecution.executedPlan.collect {
+ case f: FileSourceScanLike => f.dataFilters
+ case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
++ case b: CometScanExec => b.dataFilters
++ case b: CometNativeScanExec => b.dataFilters
++ case b: CometBatchScanExec =>
b.scan.asInstanceOf[FileScan].dataFilters
+ }.flatten
+ assert(filters.contains(GreaterThan(scan.logicalPlan.output.head,
Literal(5L))))
+ }
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
+new file mode 100644
+index 00000000000..5691536c114
+--- /dev/null
++++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
+@@ -0,0 +1,45 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.spark.sql
++
++import org.scalactic.source.Position
++import org.scalatest.Tag
++
++import org.apache.spark.sql.test.SQLTestUtils
++
++/**
++ * Tests with this tag will be ignored when Comet is enabled (e.g., via
`ENABLE_COMET`).
++ */
++case class IgnoreComet(reason: String) extends Tag("DisableComet")
++case class IgnoreCometNativeIcebergCompat(reason: String) extends
Tag("DisableComet")
++case class IgnoreCometNativeDataFusion(reason: String) extends
Tag("DisableComet")
++case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
++
++/**
++ * Helper trait that disables Comet for all tests regardless of default
config values.
++ */
++trait IgnoreCometSuite extends SQLTestUtils {
++ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
++ (implicit pos: Position): Unit = {
++ if (isCometEnabled) {
++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++ } else {
++ super.test(testName, testTags: _*)(testFun)
++ }
++ }
++}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+index 7d7185ae6c1..442a5bddeb8 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+@@ -442,7 +442,8 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
+ }
+
+ test("Runtime bloom filter join: do not add bloom filter if dpp filter
exists " +
+- "on the same column") {
++ "on the same column",
++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
+
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
+ assertDidNotRewriteWithBloomFilter("select * from bf5part join bf2 on "
+
+@@ -451,7 +452,8 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
+ }
+
+ test("Runtime bloom filter join: add bloom filter if dpp filter exists on "
+
+- "a different column") {
++ "a different column",
++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
Review Comment:
PR to enable these in all diffs:
https://github.com/apache/datafusion-comet/pull/4178
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]