This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 47a5fb6e2 test: enable ignored 4.0 tests, enable ansi mode (#3454)
47a5fb6e2 is described below

commit 47a5fb6e2629974aad846db99ac9e3d9ce39be39
Author: Parth Chandra <[email protected]>
AuthorDate: Tue Mar 17 16:02:12 2026 -0700

    test: enable ignored 4.0 tests, enable ansi mode (#3454)
    
    * fix: Update 4.0.1 diff file. Enable Ansi mode, enable 4.0 tests, fix 
failing tests.
---
 dev/diffs/4.0.1.diff | 326 +++++++++++++++++++++++----------------------------
 1 file changed, 148 insertions(+), 178 deletions(-)

diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff
index d4c53676c..a41ff3bbd 100644
--- a/dev/diffs/4.0.1.diff
+++ b/dev/diffs/4.0.1.diff
@@ -1,3 +1,43 @@
+diff --git 
a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+index 6c51bd4ff2e..e72ec1d26e2 100644
+--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
++++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+@@ -231,6 +231,11 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
+   }
+ 
+   test("Upload from all decommissioned executors") {
++    // Comet replaces Spark's shuffle with its own native shuffle, which is 
incompatible with
++    // the fallback storage migration path used by BlockManagerDecommissioner.
++    val cometEnv = System.getenv("ENABLE_COMET")
++    assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++      "Skipped when Comet is enabled: incompatible with Comet native shuffle 
storage")
+     sc = new SparkContext(getSparkConf(2, 2))
+     withSpark(sc) { sc =>
+       TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+@@ -261,6 +266,11 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
+   }
+ 
+   test("Upload multi stages") {
++    // Comet replaces Spark's shuffle with its own native shuffle, which is 
incompatible with
++    // the fallback storage migration path used by BlockManagerDecommissioner.
++    val cometEnv = System.getenv("ENABLE_COMET")
++    assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++      "Skipped when Comet is enabled: incompatible with Comet native shuffle 
storage")
+     sc = new SparkContext(getSparkConf())
+     withSpark(sc) { sc =>
+       TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+@@ -295,6 +305,11 @@ class FallbackStorageSuite extends SparkFunSuite with 
LocalSparkContext {
+ 
+   CompressionCodec.shortCompressionCodecNames.keys.foreach { codec =>
+     test(s"$codec - Newly added executors should access old data from remote 
storage") {
++      // Comet replaces Spark's shuffle with its own native shuffle, which is 
incompatible with
++      // the fallback storage migration path used by 
BlockManagerDecommissioner.
++      val cometEnv = System.getenv("ENABLE_COMET")
++      assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++        "Skipped when Comet is enabled: incompatible with Comet native 
shuffle storage")
+       sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, 
codec))
+       withSpark(sc) { sc =>
+         TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
 diff --git a/pom.xml b/pom.xml
 index 22922143fc3..7c56e5e8641 100644
 --- a/pom.xml
@@ -53,7 +93,7 @@ index dcf6223a98b..0458a5bb640 100644
      <!--
        This spark-tags test-dep is needed even though it isn't used in this 
module, otherwise testing-cmds that exclude
 diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
-index 0015d7ff99e..c9dd85e72c4 100644
+index 0015d7ff99e..dcbf0325904 100644
 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
 +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
 @@ -1042,6 +1042,23 @@ object SparkSession extends SparkSessionCompanion with 
Logging {
@@ -65,7 +105,7 @@ index 0015d7ff99e..c9dd85e72c4 100644
 +   */
 +  def isCometEnabled: Boolean = {
 +    val v = System.getenv("ENABLE_COMET")
-+    v == null || v.toBoolean
++    v == null || v == "1" || v.toBoolean
 +  }
 +
 +
@@ -332,7 +372,7 @@ index 1f8c5822e7d..b7de4e28813 100644
  WITH t(c1) AS (SELECT replace(listagg(DISTINCT col1 COLLATE unicode_rtrim) 
COLLATE utf8_binary, ' ', '') FROM (VALUES ('xbc  '), ('xbc '), ('a'), 
('xbc'))) SELECT len(c1), regexp_count(c1, 'a'), regexp_count(c1, 'xbc') FROM t
  -- !query schema
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
-index 0f42502f1d9..f616024a9c2 100644
+index 0f42502f1d9..e9ff802141f 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
 @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants
@@ -354,33 +394,30 @@ index 0f42502f1d9..f616024a9c2 100644
    }
  
    test("A cached table preserves the partitioning and ordering of its cached 
SparkPlan") {
-@@ -1626,7 +1627,9 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils
-     }
-   }
- 
--  test("SPARK-35332: Make cache plan disable configs configurable - check 
AQE") {
-+  test("SPARK-35332: Make cache plan disable configs configurable - check 
AQE",
-+    IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+      "https://github.com/apache/datafusion-comet/issues/1948";)) {
-     withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
-       SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
-       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
-@@ -1661,7 +1664,12 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils
+@@ -1659,9 +1660,18 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils
+           _.nodeName.contains("TableCacheQueryStage"))
+         val aqeNode = findNodeInSparkPlanInfo(inMemoryScanNode.get,
            _.nodeName.contains("AdaptiveSparkPlan"))
-         val aqePlanRoot = findNodeInSparkPlanInfo(inMemoryScanNode.get,
-           _.nodeName.contains("ResultQueryStage"))
+-        val aqePlanRoot = findNodeInSparkPlanInfo(inMemoryScanNode.get,
+-          _.nodeName.contains("ResultQueryStage"))
 -        aqePlanRoot.get.children.head.nodeName == "AQEShuffleRead"
-+        aqeNode.get.children.head.nodeName == "AQEShuffleRead" ||
-+          (aqeNode.get.children.head.nodeName.contains("WholeStageCodegen") &&
-+            aqeNode.get.children.head.children.head.nodeName == 
"ColumnarToRow" &&
-+            aqeNode.get.children.head.children.head.children.head.nodeName == 
"InputAdapter" &&
-+            
aqeNode.get.children.head.children.head.children.head.children.head.nodeName ==
-+              "AQEShuffleRead")
++        // Spark 4.0 wraps results in ResultQueryStage. The coalescing 
indicator is AQEShuffleRead
++        // as the direct child of InputAdapter.
++        // AdaptiveSparkPlan -> ResultQueryStage -> WholestageCodegen ->
++        //     CometColumnarToRow -> InputAdapter -> AQEShuffleRead (if 
coalesced)
++        val resultStage = aqeNode.get.children.head  // ResultQueryStage
++        val wsc = resultStage.children.head           // WholeStageCodegen
++        val c2r = wsc.children.head                   // ColumnarToRow or 
CometColumnarToRow
++        val inputAdapter = c2r.children.head           // InputAdapter
++        resultStage.nodeName  ==   "ResultQueryStage" &&
++          wsc.nodeName.startsWith("WholeStageCodegen") && // could be 
"WholeStageCodegen (1)"
++          (c2r.nodeName ==  "CometColumnarToRow" || c2r.nodeName == 
"ColumnarToRow") &&
++          inputAdapter.children.head.nodeName == "AQEShuffleRead"
        }
  
        withTempView("t0", "t1", "t2") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 9db406ff12f..abbc91f5c11 100644
+index 9db406ff12f..245e4caa319 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 @@ -30,7 +30,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
@@ -401,17 +438,6 @@ index 9db406ff12f..abbc91f5c11 100644
        }
        assert(exchangePlans.length == 1)
      }
-@@ -2241,7 +2241,9 @@ class DataFrameAggregateSuite extends QueryTest
-     }
-   }
- 
--  test("SPARK-47430 Support GROUP BY MapType") {
-+  test("SPARK-47430 Support GROUP BY MapType",
-+    IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+      "https://github.com/apache/datafusion-comet/issues/1948";)) {
-     def genMapData(dataType: String): String = {
-       s"""
-         |case when id % 4 == 0 then map()
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
 index ed182322aec..1ae6afa686a 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -1144,8 +1170,23 @@ index c1c041509c3..7d463e4b85e 100644
        .withExtensions { extensions =>
          extensions.injectColumnar(session =>
            MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) 
}
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
+index 5ba69c8f9d9..ac1256afe88 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
+@@ -82,6 +82,10 @@ class SparkSessionJobTaggingAndCancellationSuite
+   }
+ 
+   test("Tags set from session are prefixed with session UUID") {
++    // This test relies on job scheduling order which is timing-dependent and 
becomes unreliable
++    // when Comet is enabled due to changes in async execution behaviour.
++    assume(!classic.SparkSession.isCometEnabled,
++      "Skipped when Comet is enabled: test results are timing-dependent")
+     sc = new SparkContext("local[2]", "test")
+     val session = 
classic.SparkSession.builder().sparkContext(sc).getOrCreate()
+     import session.implicits._
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-index 0df7f806272..52d33d67328 100644
+index 0df7f806272..92390bd819f 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
 @@ -17,6 +17,8 @@
@@ -1212,7 +1253,7 @@ index 0df7f806272..52d33d67328 100644
  
    test("non-matching optional group") {
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
-index 2e33f6505ab..e1e93ab3bad 100644
+index 2e33f6505ab..47fa031add5 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
 @@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException
@@ -1250,14 +1291,7 @@ index 2e33f6505ab..e1e93ab3bad 100644
        }
        assert(exchanges.size === 1)
      }
-@@ -2674,22 +2681,32 @@ class SubquerySuite extends QueryTest
-     }
-   }
- 
--  test("SPARK-43402: FileSourceScanExec supports push down data filter with 
scalar subquery") {
-+  test("SPARK-43402: FileSourceScanExec supports push down data filter with 
scalar subquery",
-+    IgnoreComet("TODO: ignore for first stage of 4.0, " +
-+    "https://github.com/apache/datafusion-comet/issues/1948";)) {
+@@ -2678,18 +2685,26 @@ class SubquerySuite extends QueryTest
      def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = {
        val df = sql(query)
        checkAnswer(df, answer)
@@ -1308,15 +1342,10 @@ index fee375db10a..8c2c24e2c5f 100644
      val v = VariantBuilder.parseJson(s, false)
      new VariantVal(v.getValue, v.getMetadata)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
-index 11e9547dfc5..be9ae40ab3d 100644
+index 11e9547dfc5..637411056ae 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
-@@ -20,10 +20,11 @@ package org.apache.spark.sql.collation
- import scala.jdk.CollectionConverters.MapHasAsJava
- 
- import org.apache.spark.SparkException
--import org.apache.spark.sql.{AnalysisException, Row}
-+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
+@@ -24,6 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Row}
  import org.apache.spark.sql.catalyst.ExtendedAnalysisException
  import org.apache.spark.sql.catalyst.expressions._
  import org.apache.spark.sql.catalyst.util.CollationFactory
@@ -1334,18 +1363,28 @@ index 11e9547dfc5..be9ae40ab3d 100644
        }.nonEmpty
      )
    }
-@@ -1505,7 +1508,9 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
-     }
-   }
- 
--  test("hash join should be used for collated strings if sort merge join is 
not forced") {
-+  test("hash join should be used for collated strings if sort merge join is 
not forced",
-+    IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+      "https://github.com/apache/datafusion-comet/issues/1948";)) {
-     val t1 = "T_1"
-     val t2 = "T_2"
- 
-@@ -1611,6 +1616,7 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
+@@ -1547,10 +1550,14 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
+             if 
(!CollationFactory.fetchCollation(t.collation).supportsBinaryEquality) {
+               assert(collectFirst(queryPlan) {
+                 case b: HashJoin => b.leftKeys.head
++                case ch: CometHashJoinExec => ch.leftKeys.head
++                case cbh: CometBroadcastHashJoinExec => cbh.leftKeys.head
+               }.head.isInstanceOf[CollationKey])
+             } else {
+               assert(!collectFirst(queryPlan) {
+                 case b: HashJoin => b.leftKeys.head
++                case ch: CometHashJoinExec => ch.leftKeys.head
++                case cbh: CometBroadcastHashJoinExec => cbh.leftKeys.head
+               }.head.isInstanceOf[CollationKey])
+             }
+           }
+@@ -1606,11 +1613,13 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
+             if 
(!CollationFactory.fetchCollation(t.collation).supportsBinaryEquality) {
+               assert(collectFirst(queryPlan) {
+                 case b: BroadcastHashJoinExec => b.leftKeys.head
++                case b: CometBroadcastHashJoinExec => b.leftKeys.head
+               
}.head.asInstanceOf[ArrayTransform].function.asInstanceOf[LambdaFunction].
+                 function.isInstanceOf[CollationKey])
              } else {
                assert(!collectFirst(queryPlan) {
                  case b: BroadcastHashJoinExec => b.leftKeys.head
@@ -1353,7 +1392,7 @@ index 11e9547dfc5..be9ae40ab3d 100644
                }.head.isInstanceOf[ArrayTransform])
              }
            }
-@@ -1676,6 +1682,7 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
+@@ -1676,6 +1685,7 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
              } else {
                assert(!collectFirst(queryPlan) {
                  case b: BroadcastHashJoinExec => b.leftKeys.head
@@ -1361,17 +1400,6 @@ index 11e9547dfc5..be9ae40ab3d 100644
                }.head.isInstanceOf[ArrayTransform])
              }
            }
-@@ -1815,7 +1822,9 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
-     }
-   }
- 
--  test("rewrite with collationkey shouldn't disrupt multiple join 
conditions") {
-+  test("rewrite with collationkey shouldn't disrupt multiple join conditions",
-+    IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+      "https://github.com/apache/datafusion-comet/issues/1948";)) {
-     val t1 = "T_1"
-     val t2 = "T_2"
- 
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 index 3eeed2e4175..9f21d547c1c 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -1461,23 +1489,19 @@ index 2a0ab21ddb0..6030e7c2b9b 100644
          } finally {
            spark.listenerManager.unregister(listener)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
-index c73e8e16fbb..88cd0d47da3 100644
+index c73e8e16fbb..399f1442ad5 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
-@@ -20,10 +20,11 @@ import java.sql.Timestamp
- import java.util.Collections
- 
- import org.apache.spark.SparkConf
--import org.apache.spark.sql.{DataFrame, Row}
-+import org.apache.spark.sql.{DataFrame, IgnoreComet, Row}
+@@ -24,6 +24,8 @@ import org.apache.spark.sql.{DataFrame, Row}
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.catalyst.expressions.{Literal, 
TransformExpression}
  import org.apache.spark.sql.catalyst.plans.physical
 +import org.apache.spark.sql.comet.CometSortMergeJoinExec
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
  import org.apache.spark.sql.connector.catalog.{Column, Identifier, 
InMemoryTableCatalog}
  import org.apache.spark.sql.connector.catalog.functions._
  import org.apache.spark.sql.connector.distributions.Distributions
-@@ -32,7 +33,7 @@ import 
org.apache.spark.sql.connector.expressions.Expressions._
+@@ -32,7 +34,7 @@ import 
org.apache.spark.sql.connector.expressions.Expressions._
  import org.apache.spark.sql.execution.SparkPlan
  import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
  import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
@@ -1486,7 +1510,15 @@ index c73e8e16fbb..88cd0d47da3 100644
  import org.apache.spark.sql.execution.joins.SortMergeJoinExec
  import org.apache.spark.sql.internal.SQLConf
  import org.apache.spark.sql.internal.SQLConf._
-@@ -305,13 +306,14 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
+@@ -299,19 +301,21 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
+         Row("bbb", 20, 250.0), Row("bbb", 20, 350.0), Row("ccc", 30, 400.50)))
+   }
+ 
+-  private def collectAllShuffles(plan: SparkPlan): Seq[ShuffleExchangeExec] = 
{
++  private def collectAllShuffles(plan: SparkPlan): Seq[ShuffleExchangeLike] = 
{
+     collect(plan) {
+       case s: ShuffleExchangeExec => s
++      case c: CometShuffleExchangeExec => c
      }
    }
  
@@ -1503,17 +1535,6 @@ index c73e8e16fbb..88cd0d47da3 100644
        })
    }
  
-@@ -370,7 +372,9 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
-     checkAnswer(df.sort("res"), Seq(Row(10.0), Row(15.5), Row(41.0)))
-   }
- 
--  test("SPARK-48655: order by on partition keys should not introduce 
additional shuffle") {
-+  test("SPARK-48655: order by on partition keys should not introduce 
additional shuffle",
-+    IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+      "https://github.com/apache/datafusion-comet/issues/1948";)) {
-     val items_partitions = Array(identity("price"), identity("id"))
-     createTable(items, itemsColumns, items_partitions)
-     sql(s"INSERT INTO testcat.ns.$items VALUES " +
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
 index f62e092138a..c0404bfe85e 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -1583,17 +1604,14 @@ index 418ca3430bb..eb8267192f8 100644
        withTempPath { path =>
          val dir = path.getCanonicalPath
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
-index d1b11a74cf3..08087c80201 100644
+index d1b11a74cf3..75e4600863a 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
-@@ -17,8 +17,9 @@
- 
- package org.apache.spark.sql.execution
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
  
--import org.apache.spark.sql.{Dataset, QueryTest}
-+import org.apache.spark.sql.{Dataset, IgnoreComet, QueryTest}
+ import org.apache.spark.sql.{Dataset, QueryTest}
  import org.apache.spark.sql.IntegratedUDFTestUtils._
-+import org.apache.spark.sql.comet.CometCollectLimitExec
++import org.apache.spark.sql.comet.{CometCollectLimitExec, 
CometGlobalLimitExec, CometProjectExec, CometSortExec}
  import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
  import org.apache.spark.sql.functions.rand
  import org.apache.spark.sql.internal.SQLConf
@@ -1606,39 +1624,26 @@ index d1b11a74cf3..08087c80201 100644
        case _ => false
      }.isDefined)
    }
-@@ -77,7 +78,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
-     assert(!hasLocalSort(physicalPlan))
+@@ -47,6 +48,7 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
+   private def assertHasGlobalLimitExec(plan: SparkPlan): Unit = {
+     assert(find(plan) {
+       case _: GlobalLimitExec => true
++      case _: CometGlobalLimitExec => true
+       case _ => false
+     }.isDefined)
    }
- 
--  test("root LIMIT preserves data ordering with CollectLimitExec") {
-+  test("root LIMIT preserves data ordering with CollectLimitExec",
-+    IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+      "https://github.com/apache/datafusion-comet/issues/1948";)) {
-     withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
-       val df = spark.range(10).orderBy($"id" % 8).limit(2)
-       df.collect()
-@@ -88,7 +91,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
-     }
+@@ -55,6 +57,11 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
+     find(plan) {
+       case GlobalLimitExec(_, s: SortExec, _) => !s.global
+       case GlobalLimitExec(_, ProjectExec(_, s: SortExec), _) => !s.global
++      case CometGlobalLimitExec(_, _, _, _, s: CometSortExec, _) =>
++        !s.originalPlan.asInstanceOf[SortExec].global
++      case CometGlobalLimitExec(_, _, _, _,
++        CometProjectExec(_, _, _, _, s: CometSortExec, _), _) =>
++        !s.originalPlan.asInstanceOf[SortExec].global
+       case _ => false
+     }.isDefined
    }
- 
--  test("middle LIMIT preserves data ordering with the extra sort") {
-+  test("middle LIMIT preserves data ordering with the extra sort",
-+    IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+      "https://github.com/apache/datafusion-comet/issues/1948";)) {
-     withSQLConf(
-       SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1",
-       // To trigger the bug, we have to disable the coalescing optimization. 
Otherwise we use only
-@@ -117,7 +122,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
-     assert(!hasLocalSort(physicalPlan))
-   }
- 
--  test("middle OFFSET preserves data ordering with the extra sort") {
-+  test("middle OFFSET preserves data ordering with the extra sort",
-+    IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+      "https://github.com/apache/datafusion-comet/issues/1948";)) {
-     val df = 1.to(10).map(v => v -> v).toDF("c1", "c2").orderBy($"c1" % 8)
-     verifySortAdded(df.offset(2))
-     verifySortAdded(df.filter($"c2" > rand()).offset(2))
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
 index 743ec41dbe7..9f30d6c8e04 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
@@ -3490,7 +3495,7 @@ index 86c4e49f6f6..2e639e5f38d 100644
      val tblTargetName = "tbl_target"
      val tblSourceQualified = s"default.$tblSourceName"
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index f0f3f94b811..d64e4e54e22 100644
+index f0f3f94b811..c9d0ecfec41 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 @@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
@@ -3547,7 +3552,7 @@ index f0f3f94b811..d64e4e54e22 100644
      }
    }
  
-@@ -248,8 +271,33 @@ private[sql] trait SQLTestUtilsBase
+@@ -248,8 +271,24 @@ private[sql] trait SQLTestUtilsBase
      override protected def converter: ColumnNodeToExpressionConverter = 
self.spark.converter
    }
  
@@ -3557,15 +3562,6 @@ index f0f3f94b811..d64e4e54e22 100644
 +  protected def isCometEnabled: Boolean = SparkSession.isCometEnabled
 +
 +  /**
-+   * Whether to enable ansi mode This is only effective when
-+   * [[isCometEnabled]] returns true.
-+   */
-+  protected def enableCometAnsiMode: Boolean = {
-+    val v = System.getenv("ENABLE_COMET_ANSI_MODE")
-+    v != null && v.toBoolean
-+  }
-+
-+  /**
 +   * Whether Spark should only apply Comet scan optimization. This is only 
effective when
 +   * [[isCometEnabled]] returns true.
 +   */
@@ -3581,7 +3577,7 @@ index f0f3f94b811..d64e4e54e22 100644
      super.withSQLConf(pairs: _*)(f)
    }
  
-@@ -451,6 +499,8 @@ private[sql] trait SQLTestUtilsBase
+@@ -451,6 +490,8 @@ private[sql] trait SQLTestUtilsBase
      val schema = df.schema
      val withoutFilters = df.queryExecution.executedPlan.transform {
        case FilterExec(_, child) => child
@@ -3591,10 +3587,10 @@ index f0f3f94b811..d64e4e54e22 100644
  
      spark.internalCreateDataFrame(withoutFilters.execute(), schema)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
-index 245219c1756..7d2ef1b9145 100644
+index 245219c1756..a611836f086 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
-@@ -75,6 +75,31 @@ trait SharedSparkSessionBase
+@@ -75,6 +75,27 @@ trait SharedSparkSessionBase
        // this rule may potentially block testing of other optimization rules 
such as
        // ConstantPropagation etc.
        .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)
@@ -3618,10 +3614,6 @@ index 245219c1756..7d2ef1b9145 100644
 +          .set("spark.comet.exec.shuffle.enabled", "false")
 +      }
 +
-+      if (enableCometAnsiMode) {
-+        conf
-+          .set("spark.sql.ansi.enabled", "true")
-+      }
 +    }
      conf.set(
        StaticSQLConf.WAREHOUSE_PATH,
@@ -3662,19 +3654,10 @@ index 52abd248f3a..7a199931a08 100644
          case d: DynamicPruningExpression => d.child
        }
 diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala
-index 4b27082e188..09f591dfed3 100644
+index 4b27082e188..6710c90c789 100644
 --- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala
 +++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala
-@@ -17,7 +17,7 @@
- 
- package org.apache.spark.sql.hive
- 
--import org.apache.spark.sql.{QueryTest, Row}
-+import org.apache.spark.sql.{IgnoreComet, QueryTest, Row}
- import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
- import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
- import org.apache.spark.sql.hive.test.TestHiveSingleton
-@@ -147,11 +147,15 @@ class HiveUDFDynamicLoadSuite extends QueryTest with 
SQLTestUtils with TestHiveS
+@@ -147,7 +147,9 @@ class HiveUDFDynamicLoadSuite extends QueryTest with 
SQLTestUtils with TestHiveS
  
      // This jar file should not be placed to the classpath.
      val jarPath = "src/test/noclasspath/hive-test-udfs.jar"
@@ -3685,13 +3668,6 @@ index 4b27082e188..09f591dfed3 100644
      val jarUrl = s"file://${System.getProperty("user.dir")}/$jarPath"
  
      test("Spark should be able to run Hive UDF using jar regardless of " +
--      s"current thread context classloader (${udfInfo.identifier}") {
-+      s"current thread context classloader (${udfInfo.identifier}",
-+      IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+        "https://github.com/apache/datafusion-comet/issues/1948";)) {
-       Utils.withContextClassLoader(Utils.getSparkClassLoader) {
-         withUserDefinedFunction(udfInfo.funcName -> false) {
-           val sparkClassLoader = Thread.currentThread().getContextClassLoader
 diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
 index cc7bb193731..06555d48da7 100644
 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -3729,10 +3705,10 @@ index b67370f6eb9..746b3974b29 100644
    override def beforeEach(): Unit = {
      super.beforeEach()
 diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
-index a394d0b7393..d29b3058897 100644
+index a394d0b7393..a4bc3d3fd8e 100644
 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
 +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
-@@ -53,24 +53,47 @@ object TestHive
+@@ -53,24 +53,41 @@ object TestHive
      new SparkContext(
        System.getProperty("spark.sql.test.master", "local[1]"),
        "TestSQLContext",
@@ -3784,12 +3760,6 @@ index a394d0b7393..d29b3058897 100644
 +              .set("spark.comet.exec.enabled", "false")
 +              .set("spark.comet.exec.shuffle.enabled", "false")
 +          }
-+
-+          val a = System.getenv("ENABLE_COMET_ANSI_MODE")
-+          if (a != null && a.toBoolean) {
-+            conf
-+              .set("spark.sql.ansi.enabled", "true")
-+          }
 +        }
 +
 +        conf


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to