andygrove commented on code in PR #4093:
URL: https://github.com/apache/datafusion-comet/pull/4093#discussion_r3174800960


##########
dev/diffs/4.1.1.diff:
##########
@@ -0,0 +1,4326 @@
+diff --git 
a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+index 6df8bc85b51..dabb75e2b75 100644
+--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
++++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+@@ -268,6 +268,11 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
+   }
+ 
+   test("Upload from all decommissioned executors") {
++    // Comet replaces Spark's shuffle with its own native shuffle, which is 
incompatible with
++    // the fallback storage migration path used by BlockManagerDecommissioner.
++    val cometEnv = System.getenv("ENABLE_COMET")
++    assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++      "Skipped when Comet is enabled: incompatible with Comet native shuffle 
storage")
+     sc = new SparkContext(getSparkConf(2, 2))
+     withSpark(sc) { sc =>
+       TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+@@ -298,6 +303,11 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
+   }
+ 
+   test("Upload multi stages") {
++    // Comet replaces Spark's shuffle with its own native shuffle, which is 
incompatible with
++    // the fallback storage migration path used by BlockManagerDecommissioner.
++    val cometEnv = System.getenv("ENABLE_COMET")
++    assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++      "Skipped when Comet is enabled: incompatible with Comet native shuffle 
storage")
+     sc = new SparkContext(getSparkConf())
+     withSpark(sc) { sc =>
+       TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+@@ -332,6 +342,11 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
+ 
+   CompressionCodec.shortCompressionCodecNames.keys.foreach { codec =>
+     test(s"$codec - Newly added executors should access old data from remote 
storage") {
++      // Comet replaces Spark's shuffle with its own native shuffle, which is 
incompatible with
++      // the fallback storage migration path used by 
BlockManagerDecommissioner.
++      val cometEnv = System.getenv("ENABLE_COMET")
++      assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++        "Skipped when Comet is enabled: incompatible with Comet native 
shuffle storage")
+       sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, 
codec))
+       withSpark(sc) { sc =>
+         TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+diff --git a/pom.xml b/pom.xml
+index dc757d78812..18841e95f3d 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -152,6 +152,8 @@
+     <kryo.version>4.0.3</kryo.version>
+     <ivy.version>2.5.3</ivy.version>
+     <oro.version>2.0.8</oro.version>
++    <spark.version.short>4.1</spark.version.short>
++    <comet.version>0.16.0-SNAPSHOT</comet.version>
+     <!--
+     If you change codahale.metrics.version, you also need to change
+     the link to metrics.dropwizard.io in docs/monitoring.md.
+@@ -2594,6 +2596,25 @@
+         <artifactId>arpack</artifactId>
+         <version>${netlib.ludovic.dev.version}</version>
+       </dependency>
++      <dependency>
++        <groupId>org.apache.datafusion</groupId>
++        
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++        <version>${comet.version}</version>
++        <exclusions>
++          <exclusion>
++            <groupId>org.apache.spark</groupId>
++            <artifactId>spark-sql_${scala.binary.version}</artifactId>
++          </exclusion>
++          <exclusion>
++            <groupId>org.apache.spark</groupId>
++            <artifactId>spark-core_${scala.binary.version}</artifactId>
++          </exclusion>
++          <exclusion>
++            <groupId>org.apache.spark</groupId>
++            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
++          </exclusion>
++        </exclusions>
++      </dependency>
+       <!-- SPARK-16484 add `datasketches-java` for support Datasketches 
HllSketch -->
+       <dependency>
+         <groupId>org.apache.datasketches</groupId>
+diff --git a/sql/core/pom.xml b/sql/core/pom.xml
+index d2d07a08aa9..d89f80e5b68 100644
+--- a/sql/core/pom.xml
++++ b/sql/core/pom.xml
+@@ -97,6 +97,10 @@
+       <groupId>org.apache.spark</groupId>
+       <artifactId>spark-tags_${scala.binary.version}</artifactId>
+     </dependency>
++    <dependency>
++      <groupId>org.apache.datafusion</groupId>
++      
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
++    </dependency>
+ 
+     <!--
+       This spark-tags test-dep is needed even though it isn't used in this 
module, otherwise testing-cmds that exclude
+diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+index c47e8454162..758d80a0bcb 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+@@ -1218,6 +1218,23 @@ object SparkSession extends SparkSessionCompanion with 
Logging {
+     extensions
+   }
+ 
++  /**
++   * Whether Comet extension is enabled
++   */
++  def isCometEnabled: Boolean = {
++    val v = System.getenv("ENABLE_COMET")
++    v == null || v == "1" || v.toBoolean
++  }
++
++
++  private def loadCometExtension(sparkContext: SparkContext): Seq[String] = {
++    if (sparkContext.getConf.getBoolean("spark.comet.enabled", 
isCometEnabled)) {
++      Seq("org.apache.comet.CometSparkSessionExtensions")
++    } else {
++      Seq.empty
++    }
++  }
++
+   /**
+    * Initialize extensions specified in [[StaticSQLConf]]. The classes will 
be applied to the
+    * extensions passed into this function.
+@@ -1227,7 +1244,8 @@ object SparkSession extends SparkSessionCompanion with 
Logging {
+       extensions: SparkSessionExtensions): SparkSessionExtensions = {
+     val extensionConfClassNames = 
sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
+       .getOrElse(Seq.empty)
+-    extensionConfClassNames.foreach { extensionConfClassName =>
++    val extensionClassNames = extensionConfClassNames ++ 
loadCometExtension(sparkContext)
++    extensionClassNames.foreach { extensionConfClassName =>
+       try {
+         val extensionConfClass = Utils.classForName(extensionConfClassName)
+         val extensionConf = extensionConfClass.getConstructor().newInstance()
+diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+index 4410fe50912..43bcce2a038 100644
+--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
++++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.annotation.DeveloperApi
+ import org.apache.spark.sql.catalyst.plans.logical.{EmptyRelation, 
LogicalPlan}
++import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec}
+ import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
+ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+@@ -84,6 +85,7 @@ private[execution] object SparkPlanInfo {
+     // dump the file scan metadata (e.g file path) to event log
+     val metadata = plan match {
+       case fileScan: FileSourceScanLike => fileScan.metadata
++      case cometScan: CometScanExec => cometScan.metadata
+       case _ => Map[String, String]()
+     }
+     val childrenInfo = children.flatMap {
+diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
+index 69b4001ff34..6fda691652d 100644
+--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
++++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
+@@ -1,7 +1,7 @@
+ -- Automatically generated by SQLQueryTestSuite
+ -- !query
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+-    (1, 2), 
++    (1, 2),
+     (1, 2),
+     (1, 3),
+     (1, 3),
+@@ -11,7 +11,7 @@ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+     AS tab1(k, v)
+ -- !query analysis
+ CreateViewCommand `tab1`, SELECT * FROM VALUES
+-    (1, 2), 
++    (1, 2),
+     (1, 2),
+     (1, 3),
+     (1, 3),
+@@ -26,8 +26,8 @@ CreateViewCommand `tab1`, SELECT * FROM VALUES
+ 
+ -- !query
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+-    (1, 2), 
+-    (1, 2), 
++    (1, 2),
++    (1, 2),
+     (2, 3),
+     (3, 4),
+     (null, null),
+@@ -35,8 +35,8 @@ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+     AS tab2(k, v)
+ -- !query analysis
+ CreateViewCommand `tab2`, SELECT * FROM VALUES
+-    (1, 2), 
+-    (1, 2), 
++    (1, 2),
++    (1, 2),
+     (2, 3),
+     (3, 4),
+     (null, null),
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql 
b/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
+index 13bbd9d81b7..541cdfb1e04 100644
+--- 
a/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
++++ 
b/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
+@@ -15,6 +15,12 @@
+ --   limitations under the License.
+ --
+ 
++-- TODO: Disabled due to one of the test failed for Spark4.0
++-- TODO: https://github.com/apache/datafusion-comet/issues/1948
++-- The following query failed
++-- select /*+ COALESCE(1) */ id, a+b, a-b, a*b, a/b from decimals_test order 
by id
++--SET spark.comet.enabled = false
++
+ CREATE TEMPORARY VIEW t AS SELECT 1.0 as a, 0.0 as b;
+ 
+ -- division, remainder and pmod by 0 return NULL
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql 
b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
+index e28f0721a64..788b43c242a 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
+@@ -1,3 +1,7 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4122)
++-- EXCEPT ALL with GROUP BY returns incorrect results on Spark 4.1
++--SET spark.comet.enabled = false
++
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+     (0), (1), (2), (2), (2), (2), (3), (null), (null) AS tab1(c1);
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql 
b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+index 7aef901da4f..f3d6e18926d 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
+@@ -2,3 +2,4 @@
+ 
+ --SET spark.sql.adaptive.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql 
b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+index eeb2180f7a5..afd1b5ec289 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain-cbo.sql
+@@ -1,5 +1,6 @@
+ --SET spark.sql.cbo.enabled=true
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+ 
+ CREATE TABLE explain_temp1(a INT, b INT) USING PARQUET;
+ CREATE TABLE explain_temp2(c INT, d INT) USING PARQUET;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql 
b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+index 96dddafd82a..efedbbb3c0e 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+@@ -1,6 +1,7 @@
+ --SET spark.sql.codegen.wholeStage = true
+ --SET spark.sql.adaptive.enabled = false
+ --SET spark.sql.maxMetadataStringLength = 500
++--SET spark.comet.enabled = false
+ 
+ -- Test tables
+ CREATE table  explain_temp1 (key int, val int) USING PARQUET;
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
+index 1f53ca359fe..0f572f7c6ce 100644
+--- 
a/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
++++ 
b/sql/core/src/test/resources/sql-tests/inputs/having-and-order-by-recursive-type-name-resolution.sql
+@@ -1,3 +1,7 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4123)
++-- Comet native sort lacks row-format support for Struct(Map(...)) sort keys
++--SET spark.comet.enabled = false
++
+ -- This test file contains queries that test recursive types name resolution 
in ORDER BY and HAVING clauses.
+ 
+ -- Alias type: String, Table column type: Struct
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/hll.sql 
b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
+index 35128da97fd..25b873ae859 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/hll.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
+@@ -1,3 +1,8 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4121)
++-- Comet's native scan rejects invalid UTF-8 byte sequences inserted into the
++-- string test table, which Spark allows.
++--SET spark.comet.enabled = false
++
+ -- Positive test cases
+ -- Create a table with some testing data.
+ DROP TABLE IF EXISTS t1;
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql 
b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
+index 077caa5dd44..697457d4251 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
+@@ -1,5 +1,9 @@
++-- TODO(https://github.com/apache/datafusion-comet/issues/4122)
++-- INTERSECT ALL with GROUP BY returns incorrect results on Spark 4.1
++--SET spark.comet.enabled = false
++
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+-    (1, 2), 
++    (1, 2),
+     (1, 2),
+     (1, 3),
+     (1, 3),
+@@ -8,8 +12,8 @@ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+     (null, null)
+     AS tab1(k, v);
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+-    (1, 2), 
+-    (1, 2), 
++    (1, 2),
++    (1, 2),
+     (2, 3),
+     (3, 4),
+     (null, null),
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+index 41fd4de2a09..162d5a817b6 100644
+--- 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
++++ 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
+@@ -6,6 +6,10 @@
+ -- 
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/aggregates.sql#L352-L605
+ 
+ -- Test aggregate operator with codegen on and off.
++
++-- Floating-point precision difference between DataFusion and JVM for FILTER 
aggregates
++--SET spark.comet.enabled = false
++
+ --CONFIG_DIM1 spark.sql.codegen.wholeStage=true
+ --CONFIG_DIM1 
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
+ --CONFIG_DIM1 
spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+index 3a409eea348..26e9aaf215c 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+@@ -6,6 +6,9 @@
+ -- 
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int4.sql
+ --
+ 
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ CREATE TABLE INT4_TBL(f1 int) USING parquet;
+ 
+ -- [SPARK-28023] Trim the string when cast string type to other types
+diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+index fac23b4a26f..98b12ae5ccc 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+@@ -6,6 +6,10 @@
+ -- Test int8 64-bit integers.
+ -- 
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int8.sql
+ --
++
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ CREATE TABLE INT8_TBL(q1 bigint, q2 bigint) USING parquet;
+ 
+ -- PostgreSQL implicitly casts string literals to data with integral types, 
but
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+index 0efe0877e9b..f9df0400c99 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+@@ -6,6 +6,9 @@
+ -- 
https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
+ --
+ 
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ -- load test data
+ CREATE TABLE test_having (a int, b int, c string, d string) USING parquet;
+ INSERT INTO test_having VALUES (0, 1, 'XXXX', 'A');
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
+index 7c816d8a416..b1551a2b296 100644
+--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
++++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
+@@ -1,6 +1,23 @@
+ -- A test suite for IN LIMIT in parent side, subquery, and both predicate 
subquery
+ -- It includes correlated cases.
+ 
++-- TODO: Disabled due to one of the test failed for Spark4.0
++-- TODO: https://github.com/apache/datafusion-comet/issues/1948
++-- The following query failed
++-- SELECT Count(DISTINCT( t1a )),
++--        t1b
++-- FROM   t1
++-- WHERE  t1d NOT IN (SELECT t2d
++--                    FROM   t2
++--                    WHERE t2b > t1b
++--                    ORDER  BY t2b DESC nulls first, t2d
++--     LIMIT 1
++-- OFFSET 1)
++-- GROUP  BY t1b
++-- ORDER BY t1b NULLS last
++--     LIMIT  1
++-- OFFSET 1;
++--SET spark.comet.enabled = false
+ --CONFIG_DIM1 spark.sql.optimizeNullAwareAntiJoin=true
+ --CONFIG_DIM1 spark.sql.optimizeNullAwareAntiJoin=false
+ 
+@@ -61,6 +78,7 @@ WHERE  t1a IN (SELECT t2a
+                WHERE  t1d = t2d)
+ LIMIT  2;
+ 
++--SET spark.sql.cbo.enabled=true
+ -- correlated IN subquery
+ -- LIMIT on both parent and subquery sides
+ SELECT *
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql 
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+index e803254ea64..74db78aee38 100644
+--- 
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
++++ 
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+@@ -1,6 +1,9 @@
+ -- This test suits check the spark.sql.viewSchemaBindingMode configuration.
+ -- It can be DISABLED and COMPENSATION
+ 
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ -- Verify the default binding is true
+ SET spark.sql.legacy.viewSchemaBindingMode;
+ 
+diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql 
b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+index 21a3ce1e122..f4762ab98f0 100644
+--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
++++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+@@ -1,5 +1,9 @@
+ -- This test suite checks the WITH SCHEMA COMPENSATION clause
+ -- Disable ANSI mode to ensure we are forcing it explicitly in the CASTS
++
++-- TODO: https://github.com/apache/datafusion-comet/issues/551
++--SET spark.comet.enabled = false
++
+ SET spark.sql.ansi.enabled = false;
+ 
+ -- In COMPENSATION views get invalidated if the type can't cast
+diff --git 
a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out 
b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
+index 44f95f225ab..361866fc298 100644
+--- a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
++++ b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
+@@ -1,7 +1,7 @@
+ -- Automatically generated by SQLQueryTestSuite
+ -- !query
+ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
+-    (1, 2), 
++    (1, 2),
+     (1, 2),
+     (1, 3),
+     (1, 3),
+@@ -17,8 +17,8 @@ struct<>
+ 
+ -- !query
+ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
+-    (1, 2), 
+-    (1, 2), 
++    (1, 2),
++    (1, 2),
+     (2, 3),
+     (3, 4),
+     (null, null),
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+index 0d807aeae4d..6d7744e771b 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+@@ -49,7 +49,7 @@ import 
org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEProp
+ import org.apache.spark.sql.execution.columnar._
+ import org.apache.spark.sql.execution.command.CommandUtils
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -534,7 +534,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
+       df.collect()
+     }
+     assert(
+-      collect(df.queryExecution.executedPlan) { case e: ShuffleExchangeExec 
=> e }.size == expected)
++      collect(df.queryExecution.executedPlan) {
++        case _: ShuffleExchangeLike => 1 }.size == expected)
+   }
+ 
+   test("A cached table preserves the partitioning and ordering of its cached 
SparkPlan") {
+@@ -1673,9 +1674,18 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils
+           _.nodeName.contains("TableCacheQueryStage"))
+         val aqeNode = findNodeInSparkPlanInfo(inMemoryScanNode.get,
+           _.nodeName.contains("AdaptiveSparkPlan"))
+-        val aqePlanRoot = findNodeInSparkPlanInfo(inMemoryScanNode.get,
+-          _.nodeName.contains("ResultQueryStage"))
+-        aqePlanRoot.get.children.head.nodeName == "AQEShuffleRead"
++        // Spark 4.0 wraps results in ResultQueryStage. The coalescing 
indicator is AQEShuffleRead
++        // as the direct child of InputAdapter.
++        // AdaptiveSparkPlan -> ResultQueryStage -> WholestageCodegen ->
++        //     CometColumnarToRow -> InputAdapter -> AQEShuffleRead (if 
coalesced)
++        val resultStage = aqeNode.get.children.head  // ResultQueryStage
++        val wsc = resultStage.children.head           // WholeStageCodegen
++        val c2r = wsc.children.head                   // ColumnarToRow or 
CometColumnarToRow
++        val inputAdapter = c2r.children.head           // InputAdapter
++        resultStage.nodeName  ==   "ResultQueryStage" &&
++          wsc.nodeName.startsWith("WholeStageCodegen") && // could be 
"WholeStageCodegen (1)"
++          (c2r.nodeName ==  "CometColumnarToRow" || c2r.nodeName == 
"ColumnarToRow") &&
++          inputAdapter.children.head.nodeName == "AQEShuffleRead"
+       }
+ 
+       withTempView("t0", "t1", "t2") {
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+index 0dfd37ebeae..66340218c7c 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+@@ -31,7 +31,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
+ import org.apache.spark.sql.execution.WholeStageCodegenExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
+-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
++import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+ import org.apache.spark.sql.expressions.Window
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -856,7 +856,7 @@ class DataFrameAggregateSuite extends QueryTest
+       assert(objHashAggPlans.nonEmpty)
+ 
+       val exchangePlans = collect(aggPlan) {
+-        case shuffle: ShuffleExchangeExec => shuffle
++        case shuffle: ShuffleExchangeLike => shuffle
+       }
+       assert(exchangePlans.length == 1)
+     }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+index ed182322aec..1ae6afa686a 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+@@ -435,7 +435,9 @@ class DataFrameJoinSuite extends QueryTest
+ 
+     withTempDatabase { dbName =>
+       withTable(table1Name, table2Name) {
+-        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
++        withSQLConf(
++            SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
++            "spark.comet.enabled" -> "false") {
+           spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+           spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+ 
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+index 93ff7becaec..7b2871cc656 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+@@ -20,6 +20,7 @@ package org.apache.spark.sql
+ import java.sql.{Date, Timestamp}
+ import java.util.Locale
+ 
++import org.apache.spark.sql.IgnoreComet
+ import org.apache.spark.sql.catalyst.optimizer.RemoveNoopUnion
+ import org.apache.spark.sql.catalyst.plans.logical.Union
+ import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
+@@ -1511,7 +1512,8 @@ class DataFrameSetOperationsSuite extends QueryTest
+     }
+   }
+ 
+-  test("SPARK-52921: union partitioning - reused shuffle") {
++  test("SPARK-52921: union partitioning - reused shuffle",
++      IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098";)) {
+     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+       val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", 
"b", "c")
+       val df2 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", 
"b", "c")
+@@ -1538,7 +1540,8 @@ class DataFrameSetOperationsSuite extends QueryTest
+     }
+   }
+ 
+-  test("SPARK-52921: union partitioning - semantic equality") {
++  test("SPARK-52921: union partitioning - semantic equality",
++      IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098";)) {
+     val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", 
"c")
+     val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e", 
"f")
+ 
+@@ -1589,7 +1592,8 @@ class DataFrameSetOperationsSuite extends QueryTest
+     }
+   }
+ 
+-  test("SPARK-52921: union partitioning - range partitioning") {
++  test("SPARK-52921: union partitioning - range partitioning",
++      IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098";)) {
+     val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", 
"c")
+     val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e", 
"f")
+ 
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+index 5b88eeefeca..d4f07bc182a 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+@@ -36,11 +36,12 @@ import 
org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
+ import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+ import org.apache.spark.sql.catalyst.parser.ParseException
+ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, 
LocalRelation, LogicalPlan, OneRowRelation}
++import org.apache.spark.sql.comet.CometBroadcastExchangeExec
+ import org.apache.spark.sql.connector.FakeV2Provider
+ import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, 
QueryExecution, SortExec, WholeStageCodegenExec}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+ import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike}
++import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.expressions.{Aggregator, Window}
+ import org.apache.spark.sql.functions._
+ import org.apache.spark.sql.internal.SQLConf
+@@ -1493,7 +1494,7 @@ class DataFrameSuite extends QueryTest
+           fail("Should not have back to back Aggregates")
+         }
+         atFirstAgg = true
+-      case e: ShuffleExchangeExec => atFirstAgg = false
++      case e: ShuffleExchangeLike => atFirstAgg = false
+       case _ =>
+     }
+   }
+@@ -1683,7 +1684,7 @@ class DataFrameSuite extends QueryTest
+       checkAnswer(join, df)
+       assert(
+         collect(join.queryExecution.executedPlan) {
+-          case e: ShuffleExchangeExec => true }.size === 1)
++          case _: ShuffleExchangeLike => true }.size === 1)
+       assert(
+         collect(join.queryExecution.executedPlan) { case e: 
ReusedExchangeExec => true }.size === 1)
+       val broadcasted = broadcast(join)
+@@ -1691,10 +1692,12 @@ class DataFrameSuite extends QueryTest
+       checkAnswer(join2, df)
+       assert(
+         collect(join2.queryExecution.executedPlan) {
+-          case e: ShuffleExchangeExec => true }.size == 1)
++          case _: ShuffleExchangeLike => true }.size == 1)
+       assert(
+         collect(join2.queryExecution.executedPlan) {
+-          case e: BroadcastExchangeExec => true }.size === 1)
++          case e: BroadcastExchangeExec => true
++          case _: CometBroadcastExchangeExec => true
++        }.size === 1)
+       assert(
+         collect(join2.queryExecution.executedPlan) { case e: 
ReusedExchangeExec => true }.size == 4)
+     }
+@@ -2092,7 +2095,7 @@ class DataFrameSuite extends QueryTest
+ 
+     // Assert that no extra shuffle introduced by cogroup.
+     val exchanges = collect(df3.queryExecution.executedPlan) {
+-      case h: ShuffleExchangeExec => h
++      case h: ShuffleExchangeLike => h
+     }
+     assert(exchanges.size == 2)
+   }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 6e9f3385571..15615f3e417 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator, 
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1142,10 +1143,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+     }
+ 
+     def isShuffleExecByRequirement(
+-        plan: ShuffleExchangeExec,
++        plan: ShuffleExchangeLike,
+         desiredClusterColumns: Seq[String]): Boolean = plan match {
+       case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, 
_) =>
+         partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++      case CometShuffleExchangeExec(op: HashPartitioning, _, _, 
ENSURE_REQUIREMENTS, _, _) =>
++        partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+       case _ => false
+     }
+ 
+@@ -1168,7 +1171,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+       val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+         case w: WindowExec =>
+           w.child.exists {
+-            case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
++            case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
+             case _ => false
+           }
+         case _ => false
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+index 6df8d66ee7f..35e270c7241 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+@@ -46,7 +46,7 @@ import 
org.apache.spark.sql.catalyst.trees.DataFrameQueryContext
+ import org.apache.spark.sql.catalyst.util.sideBySide
+ import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution}
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+ import org.apache.spark.sql.expressions.UserDefinedFunction
+ import org.apache.spark.sql.functions._
+@@ -2420,7 +2420,7 @@ class DatasetSuite extends QueryTest
+ 
+     // Assert that no extra shuffle introduced by cogroup.
+     val exchanges = collect(df3.queryExecution.executedPlan) {
+-      case h: ShuffleExchangeExec => h
++      case h: ShuffleExchangeLike => h
+     }
+     assert(exchanges.size == 2)
+   }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+index e1a2fd33c7c..9a93daa8f5a 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
+ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, 
Expression}
+ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
+ import org.apache.spark.sql.catalyst.plans.ExistenceJoin
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec, 
CometSubqueryBroadcastExec}
+ import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, 
InMemoryTableWithV2FilterCatalog}
+ import org.apache.spark.sql.execution._
+ import org.apache.spark.sql.execution.adaptive._
+@@ -193,6 +194,7 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+     val subqueryBroadcast = dpExprs.collect {
+       case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) => b
++      case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) => b
+     }
+ 
+     val hasFilter = if (withSubquery) "Should" else "Shouldn't"
+@@ -247,6 +249,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     val buf = 
collectDynamicPruningExpressions(df.queryExecution.executedPlan).collect {
+       case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) =>
+         b.indices.map(idx => b.buildKeys(idx))
++      case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) =>
++        b.indices.map(idx => b.buildKeys(idx))
+     }
+     assert(buf.distinct.size == n)
+   }
+@@ -262,6 +266,12 @@ abstract class DynamicPartitionPruningSuiteBase
+       case s: BatchScanExec => s.runtimeFilters.collect {
+         case d: DynamicPruningExpression => d.child
+       }
++      case s: CometScanExec => s.partitionFilters.collect {
++        case d: DynamicPruningExpression => d.child
++      }
++      case s: CometNativeScanExec => s.partitionFilters.collect {
++        case d: DynamicPruningExpression => d.child
++      }
+       case _ => Nil
+     }
+   }
+@@ -1204,10 +1214,16 @@ abstract class DynamicPartitionPruningSuiteBase
+ 
+       val plan = df.queryExecution.executedPlan
+       val countSubqueryBroadcasts =
+-        collectWithSubqueries(plan)({ case _: SubqueryBroadcastExec => 1 
}).sum
++        collectWithSubqueries(plan)({
++          case _: SubqueryBroadcastExec => 1
++          case _: CometSubqueryBroadcastExec => 1
++        }).sum
+ 
+       val countReusedSubqueryBroadcasts =
+-        collectWithSubqueries(plan)({ case ReusedSubqueryExec(_: 
SubqueryBroadcastExec) => 1}).sum
++        collectWithSubqueries(plan)({
++          case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1
++          case ReusedSubqueryExec(_: CometSubqueryBroadcastExec) => 1
++        }).sum
+ 
+       assert(countSubqueryBroadcasts == 1)
+       assert(countReusedSubqueryBroadcasts == 1)
+@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
+   }
+ 
+   test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
+-    "canonicalization and exchange reuse") {
++    "canonicalization and exchange reuse",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/4045";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
+       withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+           SQLConf.V2_BUCKETING_ENABLED.key -> "false") {
+@@ -1331,6 +1348,7 @@ abstract class DynamicPartitionPruningSuiteBase
+   }
+ 
+   test("Subquery reuse across the whole plan",
++    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313";),
+     DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+       SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+@@ -1425,7 +1443,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+   }
+ 
+-  test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++  test("SPARK-34637: DPP side broadcast query stage is created firstly",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/4045";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
+       val df = sql(
+         """ WITH v as (
+@@ -1579,6 +1598,7 @@ abstract class DynamicPartitionPruningSuiteBase
+ 
+         val subqueryBroadcastExecs = 
collectWithSubqueries(df.queryExecution.executedPlan) {
+           case s: SubqueryBroadcastExec => s
++          case s: CometSubqueryBroadcastExec => s
+         }
+         assert(subqueryBroadcastExecs.size === 1)
+         subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
+@@ -1731,6 +1751,10 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+               case s: BatchScanExec =>
+                 // we use f1 col for v2 tables due to schema pruning
+                 s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
++              case s: CometScanExec =>
++                s.output.exists(_.exists(_.argString(maxFields = 
100).contains("fid")))
++              case s: CometNativeScanExec =>
++                s.output.exists(_.exists(_.argString(maxFields = 
100).contains("fid")))
+               case _ => false
+             }
+           assert(scanOption.isDefined)
+diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+index b27122a8de2..a4c5aac8212 100644
+--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
++++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+@@ -470,7 +470,8 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
+     }
+   }
+ 
+-  test("Explain formatted output for scan operator for datasource V2") {
++  test("Explain formatted output for scan operator for datasource V2",
++      IgnoreComet("Comet explain output is different")) {

Review Comment:
   Same ignore for 3.4.3
   
   ```
   % grep -B 1 "IgnoreComet.*(\"" dev/diffs/3.4.3.diff | grep -B 1 "explain 
output is different"
   +  test("Explain formatted output for scan operator for datasource V2",
   +      IgnoreComet("Comet explain output is different")) {
   --
   +  test("explain with table on DSv1 data source",
   +      IgnoreComet("Comet explain output is different")) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to