andygrove commented on code in PR #4093:
URL: https://github.com/apache/datafusion-comet/pull/4093#discussion_r3174805795
##########
dev/diffs/4.1.1.diff:
##########
@@ -0,0 +1,4326 @@
+diff --git
a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+index 6df8bc85b51..dabb75e2b75 100644
+--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
++++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+@@ -268,6 +268,11 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
+ }
+
+ test("Upload from all decommissioned executors") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is
incompatible with
++ // the fallback storage migration path used by BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native shuffle
storage")
+ sc = new SparkContext(getSparkConf(2, 2))
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+@@ -298,6 +303,11 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
+ }
+
+ test("Upload multi stages") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is
incompatible with
++ // the fallback storage migration path used by BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native shuffle
storage")
+ sc = new SparkContext(getSparkConf())
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+@@ -332,6 +342,11 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
+
+ CompressionCodec.shortCompressionCodecNames.keys.foreach { codec =>
+ test(s"$codec - Newly added executors should access old data from remote
storage") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is
incompatible with
++ // the fallback storage migration path used by
BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native
shuffle storage")
+ sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC,
codec))
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+diff --git a/pom.xml b/pom.xml
+index dc757d78812..18841e95f3d 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -152,6 +152,8 @@
+ <kryo.version>4.0.3</kryo.version>
+ <ivy.version>2.5.3</ivy.version>
+ <oro.version>2.0.8</oro.version>
++ <spark.version.short>4.1</spark.version.short>
++ <comet.version>0.16.0-SNAPSHOT</comet.version>
+ <!--
+ If you change codahale.metrics.version, you also need to change
+ the link to metrics.dropwizard.io in docs/monitoring.md.
+@@ -2594,6 +2596,25 @@
+ <artifactId>arpack</artifactId>
+ <version>${netlib.ludovic.dev.version}</version>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.datafusion</groupId>
++
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++ <version>${comet.version}</version>
++ <exclusions>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-sql_${scala.binary.version}</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-core_${scala.binary.version}</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.apache.spark</groupId>
++ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
++ </exclusion>
++ </exclusions>
++ </dependency>
+ <!-- SPARK-16484 add `datasketches-java` for support Datasketches
HllSketch -->
+ <dependency>
+ <groupId>org.apache.datasketches</groupId>
+diff --git a/sql/core/pom.xml b/sql/core/pom.xml
+index d2d07a08aa9..d89f80e5b68 100644
+--- a/sql/core/pom.xml
++++ b/sql/core/pom.xml
+@@ -97,6 +97,10 @@
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
++ <dependency>
++ <groupId>org.apache.datafusion</groupId>
++
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++ </dependency>
+
+ <!--
+ This spark-tags test-dep is needed even though it isn't used in this
module, otherwise testing-cmds that exclude
+diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+index c47e8454162..758d80a0bcb 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+@@ -1218,6 +1218,23 @@ object SparkSession extends SparkSessionCompanion with
Logging {
+ extensions
+ }
+
++ /**
++ * Whether Comet extension is enabled
++ */
++ def isCometEnabled: Boolean = {
++ val v = System.getenv("ENABLE_COMET")
++ v == null || v == "1" || v.toBoolean
++ }
++
++
++ private def loadCometExtension(sparkContext: SparkContext): Seq[String] = {
++ if (sparkContext.getConf.getBoolean("spark.comet.enabled",
isCometEnabled)) {
++ Seq("org.apache.comet.CometSparkSessionExtensions")
++ } else {
++ Seq.empty
++ }
++ }
++
+ /**
+ * Initialize extensions specified in [[StaticSQLConf]]. The classes will
be applied to the
+ * extensions passed into this function.
+@@ -1227,7 +1244,8 @@ object SparkSession extends SparkSessionCompanion with
Logging {
+ extensions: SparkSessionExtensions): SparkSessionExtensions = {
+ val extensionConfClassNames =
sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
+ .getOrElse(Seq.empty)
+- extensionConfClassNames.foreach { extensionConfClassName =>
++ val extensionClassNames = extensionConfClassNames ++
loadCometExtension(sparkContext)
++ extensionClassNames.foreach { extensionConfClassName =>
+ try {
+ val extensionConfClass = Utils.classForName(extensionConfClassName)
+ val extensionConf = extensionConfClass.getConstructor().newInstance()
+diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+index 4410fe50912..43bcce2a038 100644
+---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
++++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
+
+ import org.apache.spark.annotation.DeveloperApi
+ import org.apache.spark.sql.catalyst.plans.logical.{EmptyRelation,
LogicalPlan}
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
+ import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
+ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+@@ -84,6 +85,7 @@ private[execution] object SparkPlanInfo {
+ // dump the file scan metadata (e.g file path) to event log
+ val metadata = plan match {
+ case fileScan: FileSourceScanLike => fileScan.metadata
++ case cometScan: CometScanExec => cometScan.metadata
+ case _ => Map[String, String]()
+ }
+ val childrenInfo = children.flatMap {
+diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
+index 69b4001ff34..6fda691652d 100644
+---
a/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
++++
b/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
+@@ -1,7 +1,7 @@
+ -- Automatically generated by SQLQueryTestSuite
+ -- !query
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+- (1, 2),
++ (1, 2),
+ (1, 2),
+ (1, 3),
+ (1, 3),
+@@ -11,7 +11,7 @@ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+ AS tab1(k, v)
+ -- !query analysis
+ CreateViewCommand `tab1`, SELECT * FROM VALUES
+- (1, 2),
++ (1, 2),
+ (1, 2),
+ (1, 3),
+ (1, 3),
+@@ -26,8 +26,8 @@ CreateViewCommand `tab1`, SELECT * FROM VALUES
+
+ -- !query
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+- (1, 2),
+- (1, 2),
++ (1, 2),
++ (1, 2),
+ (2, 3),
+ (3, 4),
+ (null, null),
+@@ -35,8 +35,8 @@ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+ AS tab2(k, v)
+ -- !query analysis
+ CreateViewCommand `tab2`, SELECT * FROM VALUES
+- (1, 2),
+- (1, 2),
++ (1, 2),
++ (1, 2),
+ (2, 3),
+ (3, 4),
+ (null, null),
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
b/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
+index 13bbd9d81b7..541cdfb1e04 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
+@@ -15,6 +15,12 @@
+ -- limitations under the License.
+ --
+
++-- TODO: Disabled due to one of the test failed for Spark4.0
++-- TODO: https://github.com/apache/datafusion-comet/issues/1948
++-- The following query failed
++-- select /*+ COALESCE(1) */ id, a+b, a-b, a*b, a/b from decimals_test order
by id
++--SET spark.comet.enabled = false
++
+ CREATE TEMPORARY VIEW t AS SELECT 1.0 as a, 0.0 as b;
+
+ -- division, remainder and pmod by 0 return NULL
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
+index e28f0721a64..788b43c242a 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
+@@ -1,3 +1,7 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4122)
++-- EXCEPT ALL with GROUP BY returns incorrect results on Spark 4.1
++--SET spark.comet.enabled = false
++
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+ (0), (1), (2), (2), (2), (2), (3), (null), (null) AS tab1(c1);
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+index 7aef901da4f..f3d6e18926d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+@@ -2,3 +2,4 @@
+
+ --SET spark.sql.adaptive.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+index eeb2180f7a5..afd1b5ec289 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+@@ -1,5 +1,6 @@
+ --SET spark.sql.cbo.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+
+ CREATE TABLE explain_temp1(a INT, b INT) USING PARQUET;
+ CREATE TABLE explain_temp2(c INT, d INT) USING PARQUET;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+index 96dddafd82a..efedbbb3c0e 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+@@ -1,6 +1,7 @@
+ --SET spark.sql.codegen.wholeStage = true
+ --SET spark.sql.adaptive.enabled = false
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+
+ -- Test tables
+ CREATE table explain_temp1 (key int, val int) USING PARQUET;
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
b/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
+index 1f53ca359fe..0f572f7c6ce 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
+@@ -1,3 +1,7 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4123)
++-- Comet native sort lacks row-format support for Struct(Map(...)) sort keys
++--SET spark.comet.enabled = false
++
+ -- This test file contains queries that test recursive types name resolution
in ORDER BY and HAVING clauses.
+
+ -- Alias type: String, Table column type: Struct
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/hll.sql
b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
+index 35128da97fd..25b873ae859 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/hll.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
+@@ -1,3 +1,8 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4121)
++-- Comet's native scan rejects invalid UTF-8 byte sequences inserted into the
++-- string test table, which Spark allows.
++--SET spark.comet.enabled = false
++
+ -- Positive test cases
+ -- Create a table with some testing data.
+ DROP TABLE IF EXISTS t1;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
+index 077caa5dd44..697457d4251 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
+@@ -1,5 +1,9 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4122)
++-- INTERSECT ALL with GROUP BY returns incorrect results on Spark 4.1
++--SET spark.comet.enabled = false
++
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+- (1, 2),
++ (1, 2),
+ (1, 2),
+ (1, 3),
+ (1, 3),
+@@ -8,8 +12,8 @@ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+ (null, null)
+ AS tab1(k, v);
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+- (1, 2),
+- (1, 2),
++ (1, 2),
++ (1, 2),
+ (2, 3),
+ (3, 4),
+ (null, null),
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+index 41fd4de2a09..162d5a817b6 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+@@ -6,6 +6,10 @@
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/aggregates.sql#L352-L605
+
+ -- Test aggregate operator with codegen on and off.
++
++-- Floating-point precision difference between DataFusion and JVM for FILTER
aggregates
++--SET spark.comet.enabled = false
++
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+ --CONFIG_DIM1
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
+ --CONFIG_DIM1
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+index 3a409eea348..26e9aaf215c 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+@@ -6,6 +6,9 @@
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int4.sql
+ --
+
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ CREATE TABLE INT4_TBL(f1 int) USING parquet;
+
+ -- [SPARK-28023] Trim the string when cast string type to other types
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+index fac23b4a26f..98b12ae5ccc 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+@@ -6,6 +6,10 @@
+ -- Test int8 64-bit integers.
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int8.sql
+ --
++
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ CREATE TABLE INT8_TBL(q1 bigint, q2 bigint) USING parquet;
+
+ -- PostgreSQL implicitly casts string literals to data with integral types,
but
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+index 0efe0877e9b..f9df0400c99 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+@@ -6,6 +6,9 @@
+ --
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
+ --
+
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ -- load test data
+ CREATE TABLE test_having (a int, b int, c string, d string) USING parquet;
+ INSERT INTO test_having VALUES (0, 1, 'XXXX', 'A');
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
+index 7c816d8a416..b1551a2b296 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
+@@ -1,6 +1,23 @@
+ -- A test suite for IN LIMIT in parent side, subquery, and both predicate
subquery
+ -- It includes correlated cases.
+
++-- TODO: Disabled due to one of the test failed for Spark4.0
++-- TODO: https://github.com/apache/datafusion-comet/issues/1948
++-- The following query failed
++-- SELECT Count(DISTINCT( t1a )),
++-- t1b
++-- FROM t1
++-- WHERE t1d NOT IN (SELECT t2d
++-- FROM t2
++-- WHERE t2b > t1b
++-- ORDER BY t2b DESC nulls first, t2d
++-- LIMIT 1
++-- OFFSET 1)
++-- GROUP BY t1b
++-- ORDER BY t1b NULLS last
++-- LIMIT 1
++-- OFFSET 1;
++--SET spark.comet.enabled = false
+ --CONFIG_DIM1 spark.sql.optimizeNullAwareAntiJoin=true
+ --CONFIG_DIM1 spark.sql.optimizeNullAwareAntiJoin=false
+
+@@ -61,6 +78,7 @@ WHERE t1a IN (SELECT t2a
+ WHERE t1d = t2d)
+ LIMIT 2;
+
++--SET spark.sql.cbo.enabled=true
+ -- correlated IN subquery
+ -- LIMIT on both parent and subquery sides
+ SELECT *
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+index e803254ea64..74db78aee38 100644
+---
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
++++
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+@@ -1,6 +1,9 @@
+ -- This test suits check the spark.sql.viewSchemaBindingMode configuration.
+ -- It can be DISABLED and COMPENSATION
+
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ -- Verify the default binding is true
+ SET spark.sql.legacy.viewSchemaBindingMode;
+
+diff --git
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+index 21a3ce1e122..f4762ab98f0 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+@@ -1,5 +1,9 @@
+ -- This test suite checks the WITH SCHEMA COMPENSATION clause
+ -- Disable ANSI mode to ensure we are forcing it explicitly in the CASTS
++
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ SET spark.sql.ansi.enabled = false;
+
+ -- In COMPENSATION views get invalidated if the type can't cast
+diff --git
a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
+index 44f95f225ab..361866fc298 100644
+--- a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
++++ b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
+@@ -1,7 +1,7 @@
+ -- Automatically generated by SQLQueryTestSuite
+ -- !query
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+- (1, 2),
++ (1, 2),
+ (1, 2),
+ (1, 3),
+ (1, 3),
+@@ -17,8 +17,8 @@ struct<>
+
+ -- !query
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+- (1, 2),
+- (1, 2),
++ (1, 2),
++ (1, 2),
+ (2, 3),
+ (3, 4),
+ (null, null),
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+index 0d807aeae4d..6d7744e771b 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+@@ -49,7 +49,7 @@ import
org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEProp
+ import org.apache.spark.sql.execution.columnar._
+ import org.apache.spark.sql.execution.command.CommandUtils
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -534,7 +534,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
+ df.collect()
+ }
+ assert(
+- collect(df.queryExecution.executedPlan) { case e: ShuffleExchangeExec
=> e }.size == expected)
++ collect(df.queryExecution.executedPlan) {
++ case _: ShuffleExchangeLike => 1 }.size == expected)
+ }
+
+ test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
+@@ -1673,9 +1674,18 @@ class CachedTableSuite extends QueryTest with
SQLTestUtils
+ _.nodeName.contains("TableCacheQueryStage"))
+ val aqeNode = findNodeInSparkPlanInfo(inMemoryScanNode.get,
+ _.nodeName.contains("AdaptiveSparkPlan"))
+- val aqePlanRoot = findNodeInSparkPlanInfo(inMemoryScanNode.get,
+- _.nodeName.contains("ResultQueryStage"))
+- aqePlanRoot.get.children.head.nodeName == "AQEShuffleRead"
++ // Spark 4.0 wraps results in ResultQueryStage. The coalescing
indicator is AQEShuffleRead
++ // as the direct child of InputAdapter.
++ // AdaptiveSparkPlan -> ResultQueryStage -> WholestageCodegen ->
++ // CometColumnarToRow -> InputAdapter -> AQEShuffleRead (if
coalesced)
++ val resultStage = aqeNode.get.children.head // ResultQueryStage
++ val wsc = resultStage.children.head // WholeStageCodegen
++ val c2r = wsc.children.head // ColumnarToRow or
CometColumnarToRow
++ val inputAdapter = c2r.children.head // InputAdapter
++ resultStage.nodeName == "ResultQueryStage" &&
++ wsc.nodeName.startsWith("WholeStageCodegen") && // could be
"WholeStageCodegen (1)"
++ (c2r.nodeName == "CometColumnarToRow" || c2r.nodeName ==
"ColumnarToRow") &&
++ inputAdapter.children.head.nodeName == "AQEShuffleRead"
+ }
+
+ withTempView("t0", "t1", "t2") {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+index 0dfd37ebeae..66340218c7c 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+@@ -31,7 +31,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
+ import org.apache.spark.sql.execution.WholeStageCodegenExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import org.apache.spark.sql.expressions.Window
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -856,7 +856,7 @@ class DataFrameAggregateSuite extends QueryTest
+ assert(objHashAggPlans.nonEmpty)
+
+ val exchangePlans = collect(aggPlan) {
+- case shuffle: ShuffleExchangeExec => shuffle
++ case shuffle: ShuffleExchangeLike => shuffle
+ }
+ assert(exchangePlans.length == 1)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+index ed182322aec..1ae6afa686a 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+@@ -435,7 +435,9 @@ class DataFrameJoinSuite extends QueryTest
+
+ withTempDatabase { dbName =>
+ withTable(table1Name, table2Name) {
+- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
++ withSQLConf(
++ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
++ "spark.comet.enabled" -> "false") {
+ spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+ spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+index 93ff7becaec..7b2871cc656 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql
+ import java.sql.{Date, Timestamp}
+ import java.util.Locale
+
++import org.apache.spark.sql.IgnoreComet
+ import org.apache.spark.sql.catalyst.optimizer.RemoveNoopUnion
+ import org.apache.spark.sql.catalyst.plans.logical.Union
+ import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
+@@ -1511,7 +1512,8 @@ class DataFrameSetOperationsSuite extends QueryTest
+ }
+ }
+
+- test("SPARK-52921: union partitioning - reused shuffle") {
++ test("SPARK-52921: union partitioning - reused shuffle",
++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a",
"b", "c")
+ val df2 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a",
"b", "c")
+@@ -1538,7 +1540,8 @@ class DataFrameSetOperationsSuite extends QueryTest
+ }
+ }
+
+- test("SPARK-52921: union partitioning - semantic equality") {
++ test("SPARK-52921: union partitioning - semantic equality",
++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) {
+ val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b",
"c")
+ val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e",
"f")
+
+@@ -1589,7 +1592,8 @@ class DataFrameSetOperationsSuite extends QueryTest
+ }
+ }
+
+- test("SPARK-52921: union partitioning - range partitioning") {
++ test("SPARK-52921: union partitioning - range partitioning",
++ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) {
+ val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b",
"c")
+ val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e",
"f")
+
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+index 5b88eeefeca..d4f07bc182a 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+@@ -36,11 +36,12 @@ import
org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
+ import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+ import org.apache.spark.sql.catalyst.parser.ParseException
+ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode,
LocalRelation, LogicalPlan, OneRowRelation}
++import org.apache.spark.sql.comet.CometBroadcastExchangeExec
+ import org.apache.spark.sql.connector.FakeV2Provider
+ import org.apache.spark.sql.execution.{FilterExec, LogicalRDD,
QueryExecution, SortExec, WholeStageCodegenExec}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike}
++import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.expressions.{Aggregator, Window}
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -1493,7 +1494,7 @@ class DataFrameSuite extends QueryTest
+ fail("Should not have back to back Aggregates")
+ }
+ atFirstAgg = true
+- case e: ShuffleExchangeExec => atFirstAgg = false
++ case e: ShuffleExchangeLike => atFirstAgg = false
+ case _ =>
+ }
+ }
+@@ -1683,7 +1684,7 @@ class DataFrameSuite extends QueryTest
+ checkAnswer(join, df)
+ assert(
+ collect(join.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => true }.size === 1)
++ case _: ShuffleExchangeLike => true }.size === 1)
+ assert(
+ collect(join.queryExecution.executedPlan) { case e:
ReusedExchangeExec => true }.size === 1)
+ val broadcasted = broadcast(join)
+@@ -1691,10 +1692,12 @@ class DataFrameSuite extends QueryTest
+ checkAnswer(join2, df)
+ assert(
+ collect(join2.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => true }.size == 1)
++ case _: ShuffleExchangeLike => true }.size == 1)
+ assert(
+ collect(join2.queryExecution.executedPlan) {
+- case e: BroadcastExchangeExec => true }.size === 1)
++ case e: BroadcastExchangeExec => true
++ case _: CometBroadcastExchangeExec => true
++ }.size === 1)
+ assert(
+ collect(join2.queryExecution.executedPlan) { case e:
ReusedExchangeExec => true }.size == 4)
+ }
+@@ -2092,7 +2095,7 @@ class DataFrameSuite extends QueryTest
+
+ // Assert that no extra shuffle introduced by cogroup.
+ val exchanges = collect(df3.queryExecution.executedPlan) {
+- case h: ShuffleExchangeExec => h
++ case h: ShuffleExchangeLike => h
+ }
+ assert(exchanges.size == 2)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 6e9f3385571..15615f3e417 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator,
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1142,10 +1143,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ }
+
+ def isShuffleExecByRequirement(
+- plan: ShuffleExchangeExec,
++ plan: ShuffleExchangeLike,
+ desiredClusterColumns: Seq[String]): Boolean = plan match {
+ case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS,
_) =>
+ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++ case CometShuffleExchangeExec(op: HashPartitioning, _, _,
ENSURE_REQUIREMENTS, _, _) =>
++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+ case _ => false
+ }
+
+@@ -1168,7 +1171,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+ case w: WindowExec =>
+ w.child.exists {
+- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
+ case _ => false
+ }
+ case _ => false
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+index 6df8d66ee7f..35e270c7241 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+@@ -46,7 +46,7 @@ import
org.apache.spark.sql.catalyst.trees.DataFrameQueryContext
+ import org.apache.spark.sql.catalyst.util.sideBySide
+ import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+ import org.apache.spark.sql.expressions.UserDefinedFunction
+ import org.apache.spark.sql.functions._
+@@ -2420,7 +2420,7 @@ class DatasetSuite extends QueryTest
+
+ // Assert that no extra shuffle introduced by cogroup.
+ val exchanges = collect(df3.queryExecution.executedPlan) {
+- case h: ShuffleExchangeExec => h
++ case h: ShuffleExchangeLike => h
+ }
+ assert(exchanges.size == 2)
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+index e1a2fd33c7c..9a93daa8f5a 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
+ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Expression}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
+ import org.apache.spark.sql.catalyst.plans.ExistenceJoin
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec,
CometSubqueryBroadcastExec}
+ import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog,
InMemoryTableWithV2FilterCatalog}
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive._
+@@ -193,6 +194,7 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ val subqueryBroadcast = dpExprs.collect {
+ case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) => b
++ case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) => b
+ }
+
+ val hasFilter = if (withSubquery) "Should" else "Shouldn't"
+@@ -247,6 +249,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ val buf =
collectDynamicPruningExpressions(df.queryExecution.executedPlan).collect {
+ case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) =>
+ b.indices.map(idx => b.buildKeys(idx))
++ case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) =>
++ b.indices.map(idx => b.buildKeys(idx))
+ }
+ assert(buf.distinct.size == n)
+ }
+@@ -262,6 +266,12 @@ abstract class DynamicPartitionPruningSuiteBase
+ case s: BatchScanExec => s.runtimeFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
++ case s: CometScanExec => s.partitionFilters.collect {
++ case d: DynamicPruningExpression => d.child
++ }
++ case s: CometNativeScanExec => s.partitionFilters.collect {
++ case d: DynamicPruningExpression => d.child
++ }
+ case _ => Nil
+ }
+ }
+@@ -1204,10 +1214,16 @@ abstract class DynamicPartitionPruningSuiteBase
+
+ val plan = df.queryExecution.executedPlan
+ val countSubqueryBroadcasts =
+- collectWithSubqueries(plan)({ case _: SubqueryBroadcastExec => 1
}).sum
++ collectWithSubqueries(plan)({
++ case _: SubqueryBroadcastExec => 1
++ case _: CometSubqueryBroadcastExec => 1
++ }).sum
+
+ val countReusedSubqueryBroadcasts =
+- collectWithSubqueries(plan)({ case ReusedSubqueryExec(_:
SubqueryBroadcastExec) => 1}).sum
++ collectWithSubqueries(plan)({
++ case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1
++ case ReusedSubqueryExec(_: CometSubqueryBroadcastExec) => 1
++ }).sum
+
+ assert(countSubqueryBroadcasts == 1)
+ assert(countReusedSubqueryBroadcasts == 1)
+@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+
+ test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
+- "canonicalization and exchange reuse") {
++ "canonicalization and exchange reuse",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/4045")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.V2_BUCKETING_ENABLED.key -> "false") {
+@@ -1331,6 +1348,7 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+
+ test("Subquery reuse across the whole plan",
++
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"),
+ DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+@@ -1425,7 +1443,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++ test("SPARK-34637: DPP side broadcast query stage is created firstly",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/4045")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ val df = sql(
+ """ WITH v as (
+@@ -1579,6 +1598,7 @@ abstract class DynamicPartitionPruningSuiteBase
+
+ val subqueryBroadcastExecs =
collectWithSubqueries(df.queryExecution.executedPlan) {
+ case s: SubqueryBroadcastExec => s
++ case s: CometSubqueryBroadcastExec => s
+ }
+ assert(subqueryBroadcastExecs.size === 1)
+ subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
+@@ -1731,6 +1751,10 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+ case s: BatchScanExec =>
+ // we use f1 col for v2 tables due to schema pruning
+ s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
++ case s: CometScanExec =>
++ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
++ case s: CometNativeScanExec =>
++ s.output.exists(_.exists(_.argString(maxFields =
100).contains("fid")))
+ case _ => false
+ }
+ assert(scanOption.isDefined)
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+index b27122a8de2..a4c5aac8212 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+@@ -470,7 +470,8 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+ }
+ }
+
+- test("Explain formatted output for scan operator for datasource V2") {
++ test("Explain formatted output for scan operator for datasource V2",
++ IgnoreComet("Comet explain output is different")) {
+ withTempDir { dir =>
+ Seq("parquet", "orc", "csv", "json").foreach { fmt =>
+ val basePath = dir.getCanonicalPath + "/" + fmt
+@@ -548,7 +549,9 @@ class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite
+ }
+ }
+
+-class ExplainSuiteAE extends ExplainSuiteHelper with
EnableAdaptiveExecutionSuite {
++// Ignored when Comet is enabled. Comet changes expected query plans.
++class ExplainSuiteAE extends ExplainSuiteHelper with
EnableAdaptiveExecutionSuite
++ with IgnoreCometSuite {
+ import testImplicits._
+
+ test("SPARK-35884: Explain Formatted") {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+index 95e86fe4311..0f7ed3271d4 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+@@ -33,6 +33,8 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
+ import
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
positiveInt}
+ import org.apache.spark.sql.catalyst.plans.logical.Filter
+ import org.apache.spark.sql.catalyst.types.DataTypeUtils
++import org.apache.spark.sql.catalyst.util.quietly
++import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec,
CometScanExec, CometSortMergeJoinExec}
+ import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.FilePartition
+@@ -204,7 +206,11 @@ class FileBasedDataSourceSuite extends QueryTest
+ }
+
+ allFileBasedDataSources.foreach { format =>
+- testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
++ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") {
++ Seq(IgnoreCometNativeDataFusion(
++ "https://github.com/apache/datafusion-comet/issues/3314"))
++ } else Seq.empty
++ test(s"Enabling/disabling ignoreMissingFiles using $format",
ignoreMissingTags: _*) { quietly {
+ def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+@@ -264,7 +270,7 @@ class FileBasedDataSourceSuite extends QueryTest
+ }
+ }
+ }
+- }
++ }}
+ }
+
+ Seq("json", "orc").foreach { format =>
+@@ -655,18 +661,25 @@ class FileBasedDataSourceSuite extends QueryTest
+ checkAnswer(sql(s"select A from $tableName"), data.select("A"))
+
+ // RuntimeException is triggered at executor side, which is then
wrapped as
+- // SparkException at driver side
++ // SparkException at driver side. Comet native readers throw
++ // SparkRuntimeException directly without the SparkException
wrapper.
++ def getDuplicateFieldError(query: String): SparkRuntimeException
= {
++ try {
++ sql(query).collect()
++ fail("Expected an
exception").asInstanceOf[SparkRuntimeException]
++ } catch {
++ case e: SparkException =>
++ e.getCause.asInstanceOf[SparkRuntimeException]
++ case e: SparkRuntimeException => e
++ }
++ }
+ checkError(
+- exception = intercept[SparkException] {
+- sql(s"select b from $tableName").collect()
+- }.getCause.asInstanceOf[SparkRuntimeException],
++ exception = getDuplicateFieldError(s"select b from $tableName"),
+ condition = "_LEGACY_ERROR_TEMP_2093",
+ parameters = Map("requiredFieldName" -> "b", "matchedOrcFields"
-> "[b, B]")
+ )
+ checkError(
+- exception = intercept[SparkException] {
+- sql(s"select B from $tableName").collect()
+- }.getCause.asInstanceOf[SparkRuntimeException],
++ exception = getDuplicateFieldError(s"select B from $tableName"),
+ condition = "_LEGACY_ERROR_TEMP_2093",
+ parameters = Map("requiredFieldName" -> "b", "matchedOrcFields"
-> "[b, B]")
+ )
+@@ -954,6 +967,7 @@ class FileBasedDataSourceSuite extends QueryTest
+ assert(bJoinExec.isEmpty)
+ val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
+ case smJoin: SortMergeJoinExec => smJoin
++ case smJoin: CometSortMergeJoinExec => smJoin
+ }
+ assert(smJoinExec.nonEmpty)
+ }
+@@ -1014,6 +1028,7 @@ class FileBasedDataSourceSuite extends QueryTest
+
+ val fileScan = df.queryExecution.executedPlan collectFirst {
+ case BatchScanExec(_, f: FileScan, _, _, _, _) => f
++ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _,
_), _, _) => f
+ }
+ assert(fileScan.nonEmpty)
+ assert(fileScan.get.partitionFilters.nonEmpty)
+@@ -1055,6 +1070,7 @@ class FileBasedDataSourceSuite extends QueryTest
+
+ val fileScan = df.queryExecution.executedPlan collectFirst {
+ case BatchScanExec(_, f: FileScan, _, _, _, _) => f
++ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _,
_), _, _) => f
+ }
+ assert(fileScan.nonEmpty)
+ assert(fileScan.get.partitionFilters.isEmpty)
+@@ -1239,6 +1255,9 @@ class FileBasedDataSourceSuite extends QueryTest
+ val filters = df.queryExecution.executedPlan.collect {
+ case f: FileSourceScanLike => f.dataFilters
+ case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
++ case b: CometScanExec => b.dataFilters
++ case b: CometNativeScanExec => b.dataFilters
++ case b: CometBatchScanExec =>
b.scan.asInstanceOf[FileScan].dataFilters
+ }.flatten
+ assert(filters.contains(GreaterThan(scan.logicalPlan.output.head,
Literal(5L))))
+ }
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
+new file mode 100644
+index 00000000000..5691536c114
+--- /dev/null
++++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
+@@ -0,0 +1,45 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.spark.sql
++
++import org.scalactic.source.Position
++import org.scalatest.Tag
++
++import org.apache.spark.sql.test.SQLTestUtils
++
++/**
++ * Tests with this tag will be ignored when Comet is enabled (e.g., via
`ENABLE_COMET`).
++ */
++case class IgnoreComet(reason: String) extends Tag("DisableComet")
++case class IgnoreCometNativeIcebergCompat(reason: String) extends
Tag("DisableComet")
++case class IgnoreCometNativeDataFusion(reason: String) extends
Tag("DisableComet")
++case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
++
++/**
++ * Helper trait that disables Comet for all tests regardless of default
config values.
++ */
++trait IgnoreCometSuite extends SQLTestUtils {
++ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
++ (implicit pos: Position): Unit = {
++ if (isCometEnabled) {
++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
++ } else {
++ super.test(testName, testTags: _*)(testFun)
++ }
++ }
++}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+index 7d7185ae6c1..442a5bddeb8 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+@@ -442,7 +442,8 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
+ }
+
+ test("Runtime bloom filter join: do not add bloom filter if dpp filter
exists " +
+- "on the same column") {
++ "on the same column",
++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
+
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
+ assertDidNotRewriteWithBloomFilter("select * from bf5part join bf2 on "
+
+@@ -451,7 +452,8 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
+ }
+
+ test("Runtime bloom filter join: add bloom filter if dpp filter exists on "
+
+- "a different column") {
++ "a different column",
++ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
+
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
+ assertRewroteWithBloomFilter("select * from bf5part join bf2 on " +
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
+index 53e47f428c3..a55d8f0c161 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
+@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft,
BuildRight, BuildSide
+ import org.apache.spark.sql.catalyst.plans.PlanTest
+ import org.apache.spark.sql.catalyst.plans.logical._
+ import org.apache.spark.sql.catalyst.rules.RuleExecutor
++import org.apache.spark.sql.comet.{CometHashJoinExec, CometSortMergeJoinExec}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.joins._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -362,6 +363,7 @@ class JoinHintSuite extends PlanTest with
SharedSparkSession with AdaptiveSparkP
+ val executedPlan = df.queryExecution.executedPlan
+ val shuffleHashJoins = collect(executedPlan) {
+ case s: ShuffledHashJoinExec => s
++ case c: CometHashJoinExec =>
c.originalPlan.asInstanceOf[ShuffledHashJoinExec]
+ }
+ assert(shuffleHashJoins.size == 1)
+ assert(shuffleHashJoins.head.buildSide == buildSide)
+@@ -371,6 +373,7 @@ class JoinHintSuite extends PlanTest with
SharedSparkSession with AdaptiveSparkP
+ val executedPlan = df.queryExecution.executedPlan
+ val shuffleMergeJoins = collect(executedPlan) {
+ case s: SortMergeJoinExec => s
++ case c: CometSortMergeJoinExec => c
+ }
+ assert(shuffleMergeJoins.size == 1)
+ }
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+index 885512d4d19..113ae17ad9f 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+@@ -29,7 +29,8 @@ import
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+ import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow,
SortOrder}
+ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
JoinSelectionHelper}
+ import org.apache.spark.sql.catalyst.plans.logical.{Filter, HintInfo, Join,
JoinHint, NO_BROADCAST_AND_REPLICATION}
+-import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec,
ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec}
++import org.apache.spark.sql.comet._
++import org.apache.spark.sql.execution.{BinaryExecNode, ColumnarToRowExec,
FilterExec, InputAdapter, ProjectExec, SortExec, SparkPlan,
WholeStageCodegenExec}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec,
ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.joins._
+@@ -805,7 +806,8 @@ class JoinSuite extends QueryTest with SharedSparkSession
with AdaptiveSparkPlan
+ }
+ }
+
+- test("test SortMergeJoin (with spill)") {
++ test("test SortMergeJoin (with spill)",
++ IgnoreComet("TODO: Comet SMJ doesn't support spill yet")) {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
+ SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "0",
+ SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") {
+@@ -813,7 +815,8 @@ class JoinSuite extends QueryTest with SharedSparkSession
with AdaptiveSparkPlan
+ }
+ }
+
+- test("SPARK-49386: test SortMergeJoin (with spill by size threshold)") {
++ test("SPARK-49386: test SortMergeJoin (with spill by size threshold)",
++ IgnoreComet("TODO: Comet SMJ doesn't support spill yet")) {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
+ SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "0",
+ SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key ->
Int.MaxValue.toString,
+@@ -943,10 +946,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ val physical = df.queryExecution.sparkPlan
+ val physicalJoins = physical.collect {
+ case j: SortMergeJoinExec => j
++ case j: CometSortMergeJoinExec =>
j.originalPlan.asInstanceOf[SortMergeJoinExec]
+ }
+ val executed = df.queryExecution.executedPlan
+ val executedJoins = collect(executed) {
+ case j: SortMergeJoinExec => j
++ case j: CometSortMergeJoinExec =>
j.originalPlan.asInstanceOf[SortMergeJoinExec]
+ }
+ // This only applies to the above tested queries, in which a child
SortMergeJoin always
+ // contains the SortOrder required by its parent SortMergeJoin. Thus,
SortExec should never
+@@ -1192,9 +1197,11 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ val plan = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType)
+ .groupBy($"k1").count()
+ .queryExecution.executedPlan
+- assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size ===
1)
++ assert(collect(plan) {
++ case _: ShuffledHashJoinExec | _: CometHashJoinExec => true }.size
=== 1)
+ // No extra shuffle before aggregate
+- assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 2)
++ assert(collect(plan) {
++ case _: ShuffleExchangeLike => true }.size === 2)
+ })
+ }
+
+@@ -1211,10 +1218,11 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ .join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType)
+ .queryExecution
+ .executedPlan
+- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2)
++ assert(collect(plan) {
++ case _: SortMergeJoinExec | _: CometSortMergeJoinExec => true }.size
=== 2)
+ assert(collect(plan) { case _: BroadcastHashJoinExec => true }.size ===
1)
+ // No extra sort before last sort merge join
+- assert(collect(plan) { case _: SortExec => true }.size === 3)
++ assert(collect(plan) { case _: SortExec | _: CometSortExec => true
}.size === 3)
+ })
+
+ // Test shuffled hash join
+@@ -1224,10 +1232,13 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ .join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType)
+ .queryExecution
+ .executedPlan
+- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2)
+- assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size ===
1)
++ assert(collect(plan) {
++ case _: SortMergeJoinExec | _: CometSortMergeJoinExec => true }.size
=== 2)
++ assert(collect(plan) {
++ case _: ShuffledHashJoinExec | _: CometHashJoinExec => true }.size
=== 1)
+ // No extra sort before last sort merge join
+- assert(collect(plan) { case _: SortExec => true }.size === 3)
++ assert(collect(plan) {
++ case _: SortExec | _: CometSortExec => true }.size === 3)
+ })
+ }
+
+@@ -1318,12 +1329,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ inputDFs.foreach { case (df1, df2, joinExprs) =>
+ val smjDF = df1.join(df2.hint("SHUFFLE_MERGE"), joinExprs, "full")
+ assert(collect(smjDF.queryExecution.executedPlan) {
+- case _: SortMergeJoinExec => true }.size === 1)
++ case _: SortMergeJoinExec | _: CometSortMergeJoinExec => true }.size
=== 1)
+ val smjResult = smjDF.collect()
+
+ val shjDF = df1.join(df2.hint("SHUFFLE_HASH"), joinExprs, "full")
+ assert(collect(shjDF.queryExecution.executedPlan) {
+- case _: ShuffledHashJoinExec => true }.size === 1)
++ case _: ShuffledHashJoinExec | _: CometHashJoinExec => true }.size
=== 1)
+ // Same result between shuffled hash join and sort merge join
+ checkAnswer(shjDF, smjResult)
+ }
+@@ -1382,12 +1393,14 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs,
"leftouter")
+ assert(collect(smjDF.queryExecution.executedPlan) {
+ case _: SortMergeJoinExec => true
++ case _: CometSortMergeJoinExec => true
+ }.size === 1)
+ val smjResult = smjDF.collect()
+
+ val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs,
"leftouter")
+ assert(collect(shjDF.queryExecution.executedPlan) {
+ case _: ShuffledHashJoinExec => true
++ case _: CometHashJoinExec => true
+ }.size === 1)
+ // Same result between shuffled hash join and sort merge join
+ checkAnswer(shjDF, smjResult)
+@@ -1398,12 +1411,14 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs,
"rightouter")
+ assert(collect(smjDF.queryExecution.executedPlan) {
+ case _: SortMergeJoinExec => true
++ case _: CometSortMergeJoinExec => true
+ }.size === 1)
+ val smjResult = smjDF.collect()
+
+ val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs,
"rightouter")
+ assert(collect(shjDF.queryExecution.executedPlan) {
+ case _: ShuffledHashJoinExec => true
++ case _: CometHashJoinExec => true
+ }.size === 1)
+ // Same result between shuffled hash join and sort merge join
+ checkAnswer(shjDF, smjResult)
+@@ -1447,13 +1462,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ assert(shjCodegenDF.queryExecution.executedPlan.collect {
+ case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
+ case WholeStageCodegenExec(ProjectExec(_, _ :
ShuffledHashJoinExec)) => true
++ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(_:
CometHashJoinExec))) =>
++ true
++ case WholeStageCodegenExec(ColumnarToRowExec(
++ InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec,
_)))) => true
++ case _: CometHashJoinExec => true
+ }.size === 1)
+ checkAnswer(shjCodegenDF, Seq.empty)
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ val shjNonCodegenDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" ===
$"k2", joinType)
+ assert(shjNonCodegenDF.queryExecution.executedPlan.collect {
+- case _: ShuffledHashJoinExec => true }.size === 1)
++ case _: ShuffledHashJoinExec => true
++ case _: CometHashJoinExec => true
++ }.size === 1)
+ checkAnswer(shjNonCodegenDF, Seq.empty)
+ }
+ }
+@@ -1501,7 +1523,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ val plan = sql(getAggQuery(selectExpr,
joinType)).queryExecution.executedPlan
+ assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
+ // Have shuffle before aggregation
+- assert(collect(plan) { case _: ShuffleExchangeExec => true }.size
=== 1)
++ assert(collect(plan) {
++ case _: ShuffleExchangeLike => true }.size === 1)
+ }
+
+ def getJoinQuery(selectExpr: String, joinType: String): String = {
+@@ -1530,9 +1553,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ }
+ val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
+ assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
+- assert(collect(plan) { case _: SortMergeJoinExec => true }.size ===
3)
++ assert(collect(plan) {
++ case _: SortMergeJoinExec => true
++ case _: CometSortMergeJoinExec => true
++ }.size === 3)
+ // No extra sort on left side before last sort merge join
+- assert(collect(plan) { case _: SortExec => true }.size === 5)
++ assert(collect(plan) { case _: SortExec | _: CometSortExec => true
}.size === 5)
+ }
+
+ // Test output ordering is not preserved
+@@ -1541,9 +1567,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
+ val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
+ assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
+- assert(collect(plan) { case _: SortMergeJoinExec => true }.size ===
3)
++ assert(collect(plan) {
++ case _: SortMergeJoinExec => true
++ case _: CometSortMergeJoinExec => true
++ }.size === 3)
+ // Have sort on left side before last sort merge join
+- assert(collect(plan) { case _: SortExec => true }.size === 6)
++ assert(collect(plan) { case _: SortExec | _: CometSortExec => true
}.size === 6)
+ }
+
+ // Test singe partition
+@@ -1553,7 +1582,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
+ |""".stripMargin)
+ val plan = fullJoinDF.queryExecution.executedPlan
+- assert(collect(plan) { case _: ShuffleExchangeExec => true}.size == 1)
++ assert(collect(plan) {
++ case _: ShuffleExchangeLike => true}.size == 1)
+ checkAnswer(fullJoinDF, Row(100))
+ }
+ }
+@@ -1626,6 +1656,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+ Seq(semiJoinDF, antiJoinDF).foreach { df =>
+ assert(collect(df.queryExecution.executedPlan) {
+ case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey ==
ignoreDuplicatedKey => true
++ case j: CometHashJoinExec
++ if
j.originalPlan.asInstanceOf[ShuffledHashJoinExec].ignoreDuplicatedKey ==
++ ignoreDuplicatedKey => true
+ }.size == 1)
+ }
+ }
+@@ -1670,14 +1703,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+
+ test("SPARK-43113: Full outer join with duplicate stream-side references in
condition (SMJ)") {
+ def check(plan: SparkPlan): Unit = {
+- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 1)
++ assert(collect(plan) {
++ case _: SortMergeJoinExec => true
++ case _: CometSortMergeJoinExec => true
++ }.size === 1)
+ }
+ dupStreamSideColTest("MERGE", check)
+ }
+
+ test("SPARK-43113: Full outer join with duplicate stream-side references in
condition (SHJ)") {
+ def check(plan: SparkPlan): Unit = {
+- assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size ===
1)
++ assert(collect(plan) {
++ case _: ShuffledHashJoinExec => true
++ case _: CometHashJoinExec => true
++ }.size === 1)
+ }
+ dupStreamSideColTest("SHUFFLE_HASH", check)
+ }
+@@ -1813,7 +1852,8 @@ class ThreadLeakInSortMergeJoinSuite
+ sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+ }
+
+- test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
++ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)",
++ IgnoreComet("Comet SMJ doesn't spill yet")) {
+
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+index e4b5e10f7c3..c6efde09c8a 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+@@ -69,7 +69,7 @@ import org.apache.spark.util.Utils
+ * }}}
+ */
+ // scalastyle:on line.size.limit
+-trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite {
++trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite with
IgnoreCometSuite {
+
+ protected val baseResourcePath = {
+ // use the same way as `SQLQueryTestSuite` to get the resource path
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+index 74cdee49e55..3decf393ed0 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+@@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
+ checkAnswer(sql("select -0.001"), Row(BigDecimal("-0.001")))
+ }
+
+- test("external sorting updates peak execution memory") {
++ test("external sorting updates peak execution memory",
++ IgnoreComet("TODO: native CometSort does not update peak execution
memory")) {
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external
sort") {
+ sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+index 23f0144dcec..df845f7295a 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+@@ -166,7 +166,16 @@ class SQLQueryTestSuite extends QueryTest with
SharedSparkSession with SQLHelper
+ if (TestUtils.testCommandAvailable("/bin/bash")) Nil else
Set("transform.sql")
+ /** List of test cases to ignore, in lower cases. */
+ protected def ignoreList: Set[String] = Set(
+- "ignored.sql" // Do NOT remove this one. It is here to test the ignore
functionality.
++ "ignored.sql", // Do NOT remove this one. It is here to test the ignore
functionality.
++ // Comet: ORDER BY column has ties; row order is non-deterministic when
++ // running with high parallelism. Tracked for restoration once Comet
++ // produces stable ordering for these queries.
++ "replacing-missing-expression-with-alias.sql",
++ "in-set-operations.sql",
++ // Comet: theta_sketch_estimate(theta_sketch_agg(...)) over collated
++ // strings reads sketch binary as UTF-8 and fails. Skip until Comet
++ // adds proper handling or falls back for theta-sketch on collation.
++ "thetasketch.sql"
+ ) ++ otherIgnoreList
+ /** List of test cases that require TPCDS table schemas to be loaded. */
+ private def requireTPCDSCases: Seq[String] = Seq("pipe-operators.sql")
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+index 66826a9ca76..ab4265a5fb9 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+@@ -252,6 +252,8 @@ class SparkSessionExtensionSuite extends SparkFunSuite
with SQLHelper with Adapt
+ withSession(extensions) { session =>
+ session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, true)
+ session.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
++ // https://github.com/apache/datafusion-comet/issues/1197
++ session.conf.set("spark.comet.enabled", false)
+ assert(session.sessionState.columnarRules.contains(
+ MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
+ import session.implicits._
+@@ -310,6 +312,8 @@ class SparkSessionExtensionSuite extends SparkFunSuite
with SQLHelper with Adapt
+ }
+ withSession(extensions) { session =>
+ session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, enableAQE)
++ // https://github.com/apache/datafusion-comet/issues/1197
++ session.conf.set("spark.comet.enabled", false)
+ assert(session.sessionState.columnarRules.contains(
+ MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
+ import session.implicits._
+@@ -348,6 +352,8 @@ class SparkSessionExtensionSuite extends SparkFunSuite
with SQLHelper with Adapt
+ val session = SparkSession.builder()
+ .master("local[1]")
+ .config(COLUMN_BATCH_SIZE.key, 2)
++ // https://github.com/apache/datafusion-comet/issues/1197
++ .config("spark.comet.enabled", false)
+ .withExtensions { extensions =>
+ extensions.injectColumnar(session =>
+ MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
+index d7b2511eac2..d5f5b940b94 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
+@@ -82,6 +82,10 @@ class SparkSessionJobTaggingAndCancellationSuite
+ }
+
+ test("Tags set from session are prefixed with session UUID") {
++ // This test relies on job scheduling order which is timing-dependent and
becomes unreliable
++ // when Comet is enabled due to changes in async execution behaviour.
++ assume(!classic.SparkSession.isCometEnabled,
++ "Skipped when Comet is enabled: test results are timing-dependent")
+ sc = new SparkContext("local[2]", "test")
+ val session =
classic.SparkSession.builder().sparkContext(sc).getOrCreate()
+ import session.implicits._
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
+index ff0ee19ae97..01958e0c45b 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
+@@ -17,6 +17,8 @@
+
+ package org.apache.spark.sql
+
++import org.apache.comet.CometConf
++
+ import org.apache.spark.{SPARK_DOC_ROOT, SparkIllegalArgumentException,
SparkRuntimeException}
+ import org.apache.spark.sql.catalyst.expressions.Cast._
+ import org.apache.spark.sql.catalyst.expressions.IsNotNull
+@@ -179,29 +181,31 @@ class StringFunctionsSuite extends QueryTest with
SharedSparkSession {
+ }
+
+ test("string regex_replace / regex_extract") {
+- val df = Seq(
+- ("100-200", "(\\d+)-(\\d+)", "300"),
+- ("100-200", "(\\d+)-(\\d+)", "400"),
+- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
++ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
++ val df = Seq(
++ ("100-200", "(\\d+)-(\\d+)", "300"),
++ ("100-200", "(\\d+)-(\\d+)", "400"),
++ ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
+
+- checkAnswer(
+- df.select(
+- regexp_replace($"a", "(\\d+)", "num"),
+- regexp_replace($"a", $"b", $"c"),
+- regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
+- Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
+- Row("num-num", "400-400", "100") :: Nil)
+-
+- // for testing the mutable state of the expression in code gen.
+- // This is a hack way to enable the codegen, thus the codegen is enable
by default,
+- // it will still use the interpretProjection if projection followed by a
LocalRelation,
+- // hence we add a filter operator.
+- // See the optimizer rule `ConvertToLocalRelation`
+- checkAnswer(
+- df.filter("isnotnull(a)").selectExpr(
+- "regexp_replace(a, b, c)",
+- "regexp_extract(a, b, 1)"),
+- Row("300", "100") :: Row("400", "100") :: Row("400-400", "100") :: Nil)
++ checkAnswer(
++ df.select(
++ regexp_replace($"a", "(\\d+)", "num"),
++ regexp_replace($"a", $"b", $"c"),
++ regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
++ Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
++ Row("num-num", "400-400", "100") :: Nil)
++
++ // for testing the mutable state of the expression in code gen.
++ // This is a hack way to enable the codegen, thus the codegen is enable
by default,
++ // it will still use the interpretProjection if projection followed by
a LocalRelation,
++ // hence we add a filter operator.
++ // See the optimizer rule `ConvertToLocalRelation`
++ checkAnswer(
++ df.filter("isnotnull(a)").selectExpr(
++ "regexp_replace(a, b, c)",
++ "regexp_extract(a, b, 1)"),
++ Row("300", "100") :: Row("400", "100") :: Row("400-400", "100") ::
Nil)
++ }
+ }
+
+ test("non-matching optional group") {
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+index 3ba48da0e32..401a2851b66 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException
+ import org.apache.spark.sql.catalyst.expressions.{EqualTo, NamedExpression,
OuterReference, SubqueryExpression}
+ import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
+ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join,
LogicalPlan, Project, Sort, Union}
++import org.apache.spark.sql.comet.{CometNativeColumnarToRowExec,
CometNativeScanExec, CometScanExec}
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecution}
+ import org.apache.spark.sql.execution.datasources.FileScanRDD
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
+ import org.apache.spark.sql.internal.SQLConf
+ import org.apache.spark.sql.test.SharedSparkSession
+@@ -1529,6 +1530,18 @@ class SubquerySuite extends QueryTest
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
+ _.files.forall(_.urlEncodedPath.contains("p=0"))))
++ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
++ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _))))
=>
++ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
++ fs.inputRDDs().forall(
++ _.asInstanceOf[FileScanRDD].filePartitions.forall(
++ _.files.forall(_.urlEncodedPath.contains("p=0"))))
++ case CometNativeColumnarToRowExec(
++ fs: CometNativeScanExec) =>
++ fs.partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
++ fs.inputRDDs().forall(
++ _.asInstanceOf[FileScanRDD].filePartitions.forall(
++ _.files.forall(_.urlEncodedPath.contains("p=0"))))
+ case _ => false
+ })
+ }
+@@ -2094,7 +2107,7 @@ class SubquerySuite extends QueryTest
+
+ df.collect()
+ val exchanges = collect(df.queryExecution.executedPlan) {
+- case s: ShuffleExchangeExec => s
++ case s: ShuffleExchangeLike => s
+ }
+ assert(exchanges.size === 1)
+ }
+@@ -2678,18 +2691,25 @@ class SubquerySuite extends QueryTest
+ def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = {
+ val df = sql(query)
+ checkAnswer(df, answer)
+- val fileSourceScanExec = collect(df.queryExecution.executedPlan) {
+- case f: FileSourceScanExec => f
++ val dataSourceScanExec = collect(df.queryExecution.executedPlan) {
++ case f: FileSourceScanLike => f
++ case c: CometScanExec => c
++ case n: CometNativeScanExec => n
+ }
+ sparkContext.listenerBus.waitUntilEmpty()
+- assert(fileSourceScanExec.size === 1)
+- val scalarSubquery =
fileSourceScanExec.head.dataFilters.flatMap(_.collect {
+- case s: ScalarSubquery => s
+- })
++ assert(dataSourceScanExec.size === 1)
++ val scalarSubquery = dataSourceScanExec.head match {
++ case f: FileSourceScanLike =>
++ f.dataFilters.flatMap(_.collect { case s: ScalarSubquery => s })
++ case c: CometScanExec =>
++ c.dataFilters.flatMap(_.collect { case s: ScalarSubquery => s })
++ case n: CometNativeScanExec =>
++ n.dataFilters.flatMap(_.collect { case s: ScalarSubquery => s })
++ }
+ assert(scalarSubquery.length === 1)
+ assert(scalarSubquery.head.plan.isInstanceOf[ReusedSubqueryExec])
+- assert(fileSourceScanExec.head.metrics("numFiles").value === 1)
+- assert(fileSourceScanExec.head.metrics("numOutputRows").value ===
answer.size)
++ assert(dataSourceScanExec.head.metrics("numFiles").value === 1)
++ assert(dataSourceScanExec.head.metrics("numOutputRows").value ===
answer.size)
+ }
+
+ withTable("t1", "t2") {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
+index fee375db10a..8c2c24e2c5f 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
+@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._
+ import org.apache.spark.types.variant._
+ import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
+
+-class VariantShreddingSuite extends QueryTest with SharedSparkSession with
ParquetTest {
++class VariantShreddingSuite extends QueryTest with SharedSparkSession with
ParquetTest
++ // TODO enable tests once
https://github.com/apache/datafusion-comet/issues/2209 is fixed
++ with IgnoreCometSuite {
+ def parseJson(s: String): VariantVal = {
+ val v = VariantBuilder.parseJson(s, false)
+ new VariantVal(v.getValue, v.getMetadata)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
+index 6cdf681d65c..34a0e3714bd 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
+@@ -26,6 +26,8 @@ import org.apache.spark.sql.{AnalysisException, Row}
+ import org.apache.spark.sql.catalyst.ExtendedAnalysisException
+ import org.apache.spark.sql.catalyst.expressions._
+ import org.apache.spark.sql.catalyst.util.CollationFactory
++import org.apache.spark.sql.comet.{CometBroadcastHashJoinExec,
CometHashJoinExec, CometSortMergeJoinExec}
++import org.apache.spark.sql.comet.CometHashAggregateExec
+ import org.apache.spark.sql.connector.{DatasourceV2SQLBase,
FakeV2ProviderWithCustomSchema}
+ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier,
InMemoryTable}
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
+@@ -57,7 +59,9 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
+ assert(
+ collectFirst(queryPlan) {
+ case _: SortMergeJoinExec => assert(isSortMergeForced)
++ case _: CometSortMergeJoinExec => assert(isSortMergeForced)
+ case _: HashJoin => assert(!isSortMergeForced)
++ case _: CometHashJoinExec | _: CometBroadcastHashJoinExec =>
assert(!isSortMergeForced)
+ }.nonEmpty
+ )
+ }
+@@ -465,6 +469,7 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
+ val dfBinary = sql(s"SELECT COUNT(*), c FROM $tableNameBinary GROUP
BY c")
+ assert(collectFirst(dfBinary.queryExecution.executedPlan) {
+ case _: HashAggregateExec | _: ObjectHashAggregateExec => ()
++ case _: CometHashAggregateExec => ()
+ }.nonEmpty)
+ }
+ }
+@@ -1550,10 +1555,14 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
+ if
(!CollationFactory.fetchCollation(t.collation).supportsBinaryEquality) {
+ assert(collectFirst(queryPlan) {
+ case b: HashJoin => b.leftKeys.head
++ case ch: CometHashJoinExec => ch.leftKeys.head
++ case cbh: CometBroadcastHashJoinExec => cbh.leftKeys.head
+ }.head.isInstanceOf[CollationKey])
+ } else {
+ assert(!collectFirst(queryPlan) {
+ case b: HashJoin => b.leftKeys.head
++ case ch: CometHashJoinExec => ch.leftKeys.head
++ case cbh: CometBroadcastHashJoinExec => cbh.leftKeys.head
+ }.head.isInstanceOf[CollationKey])
+ }
+ }
+@@ -1609,11 +1618,13 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
+ if
(!CollationFactory.fetchCollation(t.collation).supportsBinaryEquality) {
+ assert(collectFirst(queryPlan) {
+ case b: BroadcastHashJoinExec => b.leftKeys.head
++ case b: CometBroadcastHashJoinExec => b.leftKeys.head
+
}.head.asInstanceOf[ArrayTransform].function.asInstanceOf[LambdaFunction].
+ function.isInstanceOf[CollationKey])
+ } else {
+ assert(!collectFirst(queryPlan) {
+ case b: BroadcastHashJoinExec => b.leftKeys.head
++ case b: CometBroadcastHashJoinExec => b.leftKeys.head
+ }.head.isInstanceOf[ArrayTransform])
+ }
+ }
+@@ -1679,6 +1690,7 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
+ } else {
+ assert(!collectFirst(queryPlan) {
+ case b: BroadcastHashJoinExec => b.leftKeys.head
++ case b: CometBroadcastHashJoinExec => b.leftKeys.head
+ }.head.isInstanceOf[ArrayTransform])
+ }
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+index a09b7e0827c..ffc29f764bc 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+@@ -29,6 +29,7 @@ import org.apache.spark.SparkUnsupportedOperationException
+ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.ScalarSubquery
++import org.apache.spark.sql.comet.CometSortExec
+ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project}
+ import org.apache.spark.sql.connector.catalog.{PartitionInternalRow,
SupportsRead, Table, TableCapability, TableProvider}
+ import org.apache.spark.sql.connector.catalog.TableCapability._
+@@ -41,7 +42,7 @@ import org.apache.spark.sql.execution.SortExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2Relation, DataSourceV2ScanRelation, V2ScanPartitioningAndOrdering}
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+-import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{Exchange,
ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+ import org.apache.spark.sql.expressions.Window
+ import org.apache.spark.sql.functions._
+@@ -283,13 +284,13 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
+ val groupByColJ = df.groupBy($"j").agg(sum($"i"))
+ checkAnswer(groupByColJ, Seq(Row(2, 8), Row(4, 2), Row(6, 5)))
+ assert(collectFirst(groupByColJ.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => e
++ case e: ShuffleExchangeLike => e
+ }.isDefined)
+
+ val groupByIPlusJ = df.groupBy($"i" + $"j").agg(count("*"))
+ checkAnswer(groupByIPlusJ, Seq(Row(5, 2), Row(6, 2), Row(8, 1),
Row(9, 1)))
+ assert(collectFirst(groupByIPlusJ.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => e
++ case e: ShuffleExchangeLike => e
+ }.isDefined)
+ }
+ }
+@@ -349,10 +350,11 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
+
+ val (shuffleExpected, sortExpected) = groupByExpects
+ assert(collectFirst(groupBy.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => e
++ case e: ShuffleExchangeLike => e
+ }.isDefined === shuffleExpected)
+ assert(collectFirst(groupBy.queryExecution.executedPlan) {
+ case e: SortExec => e
++ case c: CometSortExec => c
+ }.isDefined === sortExpected)
+ }
+
+@@ -367,10 +369,11 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
+
+ val (shuffleExpected, sortExpected) = windowFuncExpects
+
assert(collectFirst(windowPartByColIOrderByColJ.queryExecution.executedPlan) {
+- case e: ShuffleExchangeExec => e
++ case e: ShuffleExchangeLike => e
+ }.isDefined === shuffleExpected)
+
assert(collectFirst(windowPartByColIOrderByColJ.queryExecution.executedPlan) {
+ case e: SortExec => e
++ case c: CometSortExec => c
+ }.isDefined === sortExpected)
+ }
+ }
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+index 2a0ab21ddb0..6030e7c2b9b 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
+ import org.apache.spark.{SparkConf, SparkException}
+ import org.apache.spark.sql.QueryTest
+ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
+ import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
Table, TableCapability}
+ import org.apache.spark.sql.connector.read.ScanBuilder
+ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+@@ -188,7 +189,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest
with SharedSparkSession {
+ val df = spark.read.format(format).load(path.getCanonicalPath)
+ checkAnswer(df, inputData.toDF())
+ assert(
+-
df.queryExecution.executedPlan.exists(_.isInstanceOf[FileSourceScanExec]))
++ df.queryExecution.executedPlan.exists {
++ case _: FileSourceScanExec | _: CometScanExec | _:
CometNativeScanExec => true
++ case _ => false
++ }
++ )
+ }
+ } finally {
+ spark.listenerManager.unregister(listener)
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+index 7c830bf6c6e..6d9c643d83e 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+@@ -24,6 +24,8 @@ import org.apache.spark.sql.{DataFrame, Row}
+ import org.apache.spark.sql.catalyst.InternalRow
+ import org.apache.spark.sql.catalyst.expressions.{Literal,
TransformExpression}
+ import org.apache.spark.sql.catalyst.plans.physical
++import org.apache.spark.sql.comet.CometSortMergeJoinExec
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.connector.catalog.{Column, Identifier,
InMemoryTableCatalog}
+ import org.apache.spark.sql.connector.catalog.functions._
+ import org.apache.spark.sql.connector.distributions.Distributions
+@@ -32,7 +34,7 @@ import
org.apache.spark.sql.connector.expressions.Expressions._
+ import org.apache.spark.sql.execution.SparkPlan
+ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec,
ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+ import org.apache.spark.sql.functions.{col, max}
+ import org.apache.spark.sql.internal.SQLConf
+@@ -300,19 +302,21 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
+ Row("bbb", 20, 250.0), Row("bbb", 20, 350.0), Row("ccc", 30, 400.50)))
+ }
+
+- private def collectAllShuffles(plan: SparkPlan): Seq[ShuffleExchangeExec] =
{
++ private def collectAllShuffles(plan: SparkPlan): Seq[ShuffleExchangeLike] =
{
+ collect(plan) {
+ case s: ShuffleExchangeExec => s
++ case c: CometShuffleExchangeExec => c
+ }
+ }
+
+- private def collectShuffles(plan: SparkPlan): Seq[ShuffleExchangeExec] = {
++ private def collectShuffles(plan: SparkPlan): Seq[ShuffleExchangeLike] = {
+ // here we skip collecting shuffle operators that are not associated with
SMJ
+ collect(plan) {
+ case s: SortMergeJoinExec => s
++ case c: CometSortMergeJoinExec => c.originalPlan
+ }.flatMap(smj =>
+ collect(smj) {
+- case s: ShuffleExchangeExec => s
++ case s: ShuffleExchangeLike => s
+ })
+ }
+
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+index 7c4852c5e22..d1a34456bdc 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+@@ -21,7 +21,7 @@ package org.apache.spark.sql.connector
+ import java.sql.Date
+ import java.util.Collections
+
+-import org.apache.spark.sql.{catalyst, AnalysisException, DataFrame, Row}
++import org.apache.spark.sql.{catalyst, AnalysisException, DataFrame,
IgnoreCometSuite, Row}
+ import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression,
Cast, Literal}
+ import org.apache.spark.sql.catalyst.expressions.objects.Invoke
+ import org.apache.spark.sql.catalyst.plans.physical
+@@ -45,7 +45,8 @@ import org.apache.spark.sql.util.QueryExecutionListener
+ import org.apache.spark.tags.SlowSQLTest
+
+ @SlowSQLTest
+-class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase {
++class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
++ with IgnoreCometSuite {
+ import testImplicits._
+
+ before {
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+index fcecaf25d4c..e5a511022cc 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+@@ -31,7 +31,7 @@ import org.mockito.Mockito.{mock, spy, when}
+ import org.scalatest.time.SpanSugar._
+
+ import org.apache.spark._
+-import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Encoder,
KryoData, QueryTest, Row, SaveMode}
++import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Encoder,
IgnoreComet, KryoData, QueryTest, Row, SaveMode}
+ import org.apache.spark.sql.catalyst.FunctionIdentifier
+ import org.apache.spark.sql.catalyst.analysis.{NamedParameter,
UnresolvedGenerator}
+ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+@@ -267,7 +267,8 @@ class QueryExecutionErrorsSuite
+ }
+
+ test("INCONSISTENT_BEHAVIOR_CROSS_VERSION: " +
+- "compatibility with Spark 2.4/3.2 in reading/writing dates") {
++ "compatibility with Spark 2.4/3.2 in reading/writing dates",
++ IgnoreComet("Comet doesn't completely support datetime rebase mode yet"))
{
Review Comment:
ignored since 3.4.3
```
% grep -B 1 "IgnoreComet.*(\"" dev/diffs/3.4.3.diff | grep -B 1 "datetime
rebase"
+ "compatibility with Spark 2.4/3.2 in reading/writing dates",
+ IgnoreComet("Comet doesn't completely support datetime rebase mode
yet")) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]