This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 47a5fb6e2 test: enable ignored 4.0 tests, enable ansi mode (#3454)
47a5fb6e2 is described below
commit 47a5fb6e2629974aad846db99ac9e3d9ce39be39
Author: Parth Chandra <[email protected]>
AuthorDate: Tue Mar 17 16:02:12 2026 -0700
test: enable ignored 4.0 tests, enable ansi mode (#3454)
* fix: Update 4.0.1 diff file. Enable Ansi mode, enable 4.0 tests, fix
failing tests.
---
dev/diffs/4.0.1.diff | 326 +++++++++++++++++++++++----------------------------
1 file changed, 148 insertions(+), 178 deletions(-)
diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff
index d4c53676c..a41ff3bbd 100644
--- a/dev/diffs/4.0.1.diff
+++ b/dev/diffs/4.0.1.diff
@@ -1,3 +1,43 @@
+diff --git
a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+index 6c51bd4ff2e..e72ec1d26e2 100644
+--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
++++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+@@ -231,6 +231,11 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
+ }
+
+ test("Upload from all decommissioned executors") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is
incompatible with
++ // the fallback storage migration path used by BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native shuffle
storage")
+ sc = new SparkContext(getSparkConf(2, 2))
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+@@ -261,6 +266,11 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
+ }
+
+ test("Upload multi stages") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is
incompatible with
++ // the fallback storage migration path used by BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native shuffle
storage")
+ sc = new SparkContext(getSparkConf())
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
+@@ -295,6 +305,11 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
+
+ CompressionCodec.shortCompressionCodecNames.keys.foreach { codec =>
+ test(s"$codec - Newly added executors should access old data from remote
storage") {
++ // Comet replaces Spark's shuffle with its own native shuffle, which is
incompatible with
++ // the fallback storage migration path used by
BlockManagerDecommissioner.
++ val cometEnv = System.getenv("ENABLE_COMET")
++ assume(cometEnv == null || cometEnv == "0" || cometEnv == "false",
++ "Skipped when Comet is enabled: incompatible with Comet native
shuffle storage")
+ sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC,
codec))
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
diff --git a/pom.xml b/pom.xml
index 22922143fc3..7c56e5e8641 100644
--- a/pom.xml
@@ -53,7 +93,7 @@ index dcf6223a98b..0458a5bb640 100644
<!--
This spark-tags test-dep is needed even though it isn't used in this
module, otherwise testing-cmds that exclude
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
-index 0015d7ff99e..c9dd85e72c4 100644
+index 0015d7ff99e..dcbf0325904 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
@@ -1042,6 +1042,23 @@ object SparkSession extends SparkSessionCompanion with
Logging {
@@ -65,7 +105,7 @@ index 0015d7ff99e..c9dd85e72c4 100644
+ */
+ def isCometEnabled: Boolean = {
+ val v = System.getenv("ENABLE_COMET")
-+ v == null || v.toBoolean
++ v == null || v == "1" || v.toBoolean
+ }
+
+
@@ -332,7 +372,7 @@ index 1f8c5822e7d..b7de4e28813 100644
WITH t(c1) AS (SELECT replace(listagg(DISTINCT col1 COLLATE unicode_rtrim)
COLLATE utf8_binary, ' ', '') FROM (VALUES ('xbc '), ('xbc '), ('a'),
('xbc'))) SELECT len(c1), regexp_count(c1, 'a'), regexp_count(c1, 'xbc') FROM t
-- !query schema
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
-index 0f42502f1d9..f616024a9c2 100644
+index 0f42502f1d9..e9ff802141f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants
@@ -354,33 +394,30 @@ index 0f42502f1d9..f616024a9c2 100644
}
test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
-@@ -1626,7 +1627,9 @@ class CachedTableSuite extends QueryTest with
SQLTestUtils
- }
- }
-
-- test("SPARK-35332: Make cache plan disable configs configurable - check
AQE") {
-+ test("SPARK-35332: Make cache plan disable configs configurable - check
AQE",
-+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+ "https://github.com/apache/datafusion-comet/issues/1948")) {
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
- SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
- SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
-@@ -1661,7 +1664,12 @@ class CachedTableSuite extends QueryTest with
SQLTestUtils
+@@ -1659,9 +1660,18 @@ class CachedTableSuite extends QueryTest with
SQLTestUtils
+ _.nodeName.contains("TableCacheQueryStage"))
+ val aqeNode = findNodeInSparkPlanInfo(inMemoryScanNode.get,
_.nodeName.contains("AdaptiveSparkPlan"))
- val aqePlanRoot = findNodeInSparkPlanInfo(inMemoryScanNode.get,
- _.nodeName.contains("ResultQueryStage"))
+- val aqePlanRoot = findNodeInSparkPlanInfo(inMemoryScanNode.get,
+- _.nodeName.contains("ResultQueryStage"))
- aqePlanRoot.get.children.head.nodeName == "AQEShuffleRead"
-+ aqeNode.get.children.head.nodeName == "AQEShuffleRead" ||
-+ (aqeNode.get.children.head.nodeName.contains("WholeStageCodegen") &&
-+ aqeNode.get.children.head.children.head.nodeName ==
"ColumnarToRow" &&
-+ aqeNode.get.children.head.children.head.children.head.nodeName ==
"InputAdapter" &&
-+
aqeNode.get.children.head.children.head.children.head.children.head.nodeName ==
-+ "AQEShuffleRead")
++ // Spark 4.0 wraps results in ResultQueryStage. The coalescing
indicator is AQEShuffleRead
++ // as the direct child of InputAdapter.
++ // AdaptiveSparkPlan -> ResultQueryStage -> WholestageCodegen ->
++ // CometColumnarToRow -> InputAdapter -> AQEShuffleRead (if
coalesced)
++ val resultStage = aqeNode.get.children.head // ResultQueryStage
++ val wsc = resultStage.children.head // WholeStageCodegen
++ val c2r = wsc.children.head // ColumnarToRow or
CometColumnarToRow
++ val inputAdapter = c2r.children.head // InputAdapter
++ resultStage.nodeName == "ResultQueryStage" &&
++ wsc.nodeName.startsWith("WholeStageCodegen") && // could be
"WholeStageCodegen (1)"
++ (c2r.nodeName == "CometColumnarToRow" || c2r.nodeName ==
"ColumnarToRow") &&
++ inputAdapter.children.head.nodeName == "AQEShuffleRead"
}
withTempView("t0", "t1", "t2") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 9db406ff12f..abbc91f5c11 100644
+index 9db406ff12f..245e4caa319 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
@@ -401,17 +438,6 @@ index 9db406ff12f..abbc91f5c11 100644
}
assert(exchangePlans.length == 1)
}
-@@ -2241,7 +2241,9 @@ class DataFrameAggregateSuite extends QueryTest
- }
- }
-
-- test("SPARK-47430 Support GROUP BY MapType") {
-+ test("SPARK-47430 Support GROUP BY MapType",
-+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+ "https://github.com/apache/datafusion-comet/issues/1948")) {
- def genMapData(dataType: String): String = {
- s"""
- |case when id % 4 == 0 then map()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index ed182322aec..1ae6afa686a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -1144,8 +1170,23 @@ index c1c041509c3..7d463e4b85e 100644
.withExtensions { extensions =>
extensions.injectColumnar(session =>
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
+index 5ba69c8f9d9..ac1256afe88 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala
+@@ -82,6 +82,10 @@ class SparkSessionJobTaggingAndCancellationSuite
+ }
+
+ test("Tags set from session are prefixed with session UUID") {
++ // This test relies on job scheduling order which is timing-dependent and
becomes unreliable
++ // when Comet is enabled due to changes in async execution behaviour.
++ assume(!classic.SparkSession.isCometEnabled,
++ "Skipped when Comet is enabled: test results are timing-dependent")
+ sc = new SparkContext("local[2]", "test")
+ val session =
classic.SparkSession.builder().sparkContext(sc).getOrCreate()
+ import session.implicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
-index 0df7f806272..52d33d67328 100644
+index 0df7f806272..92390bd819f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
@@ -17,6 +17,8 @@
@@ -1212,7 +1253,7 @@ index 0df7f806272..52d33d67328 100644
test("non-matching optional group") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
-index 2e33f6505ab..e1e93ab3bad 100644
+index 2e33f6505ab..47fa031add5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException
@@ -1250,14 +1291,7 @@ index 2e33f6505ab..e1e93ab3bad 100644
}
assert(exchanges.size === 1)
}
-@@ -2674,22 +2681,32 @@ class SubquerySuite extends QueryTest
- }
- }
-
-- test("SPARK-43402: FileSourceScanExec supports push down data filter with
scalar subquery") {
-+ test("SPARK-43402: FileSourceScanExec supports push down data filter with
scalar subquery",
-+ IgnoreComet("TODO: ignore for first stage of 4.0, " +
-+ "https://github.com/apache/datafusion-comet/issues/1948")) {
+@@ -2678,18 +2685,26 @@ class SubquerySuite extends QueryTest
def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = {
val df = sql(query)
checkAnswer(df, answer)
@@ -1308,15 +1342,10 @@ index fee375db10a..8c2c24e2c5f 100644
val v = VariantBuilder.parseJson(s, false)
new VariantVal(v.getValue, v.getMetadata)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
-index 11e9547dfc5..be9ae40ab3d 100644
+index 11e9547dfc5..637411056ae 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
-@@ -20,10 +20,11 @@ package org.apache.spark.sql.collation
- import scala.jdk.CollectionConverters.MapHasAsJava
-
- import org.apache.spark.SparkException
--import org.apache.spark.sql.{AnalysisException, Row}
-+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
+@@ -24,6 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.CollationFactory
@@ -1334,18 +1363,28 @@ index 11e9547dfc5..be9ae40ab3d 100644
}.nonEmpty
)
}
-@@ -1505,7 +1508,9 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
- }
- }
-
-- test("hash join should be used for collated strings if sort merge join is
not forced") {
-+ test("hash join should be used for collated strings if sort merge join is
not forced",
-+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+ "https://github.com/apache/datafusion-comet/issues/1948")) {
- val t1 = "T_1"
- val t2 = "T_2"
-
-@@ -1611,6 +1616,7 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
+@@ -1547,10 +1550,14 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
+ if
(!CollationFactory.fetchCollation(t.collation).supportsBinaryEquality) {
+ assert(collectFirst(queryPlan) {
+ case b: HashJoin => b.leftKeys.head
++ case ch: CometHashJoinExec => ch.leftKeys.head
++ case cbh: CometBroadcastHashJoinExec => cbh.leftKeys.head
+ }.head.isInstanceOf[CollationKey])
+ } else {
+ assert(!collectFirst(queryPlan) {
+ case b: HashJoin => b.leftKeys.head
++ case ch: CometHashJoinExec => ch.leftKeys.head
++ case cbh: CometBroadcastHashJoinExec => cbh.leftKeys.head
+ }.head.isInstanceOf[CollationKey])
+ }
+ }
+@@ -1606,11 +1613,13 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
+ if
(!CollationFactory.fetchCollation(t.collation).supportsBinaryEquality) {
+ assert(collectFirst(queryPlan) {
+ case b: BroadcastHashJoinExec => b.leftKeys.head
++ case b: CometBroadcastHashJoinExec => b.leftKeys.head
+
}.head.asInstanceOf[ArrayTransform].function.asInstanceOf[LambdaFunction].
+ function.isInstanceOf[CollationKey])
} else {
assert(!collectFirst(queryPlan) {
case b: BroadcastHashJoinExec => b.leftKeys.head
@@ -1353,7 +1392,7 @@ index 11e9547dfc5..be9ae40ab3d 100644
}.head.isInstanceOf[ArrayTransform])
}
}
-@@ -1676,6 +1682,7 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
+@@ -1676,6 +1685,7 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
} else {
assert(!collectFirst(queryPlan) {
case b: BroadcastHashJoinExec => b.leftKeys.head
@@ -1361,17 +1400,6 @@ index 11e9547dfc5..be9ae40ab3d 100644
}.head.isInstanceOf[ArrayTransform])
}
}
-@@ -1815,7 +1822,9 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
- }
- }
-
-- test("rewrite with collationkey shouldn't disrupt multiple join
conditions") {
-+ test("rewrite with collationkey shouldn't disrupt multiple join conditions",
-+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+ "https://github.com/apache/datafusion-comet/issues/1948")) {
- val t1 = "T_1"
- val t2 = "T_2"
-
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 3eeed2e4175..9f21d547c1c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -1461,23 +1489,19 @@ index 2a0ab21ddb0..6030e7c2b9b 100644
} finally {
spark.listenerManager.unregister(listener)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
-index c73e8e16fbb..88cd0d47da3 100644
+index c73e8e16fbb..399f1442ad5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
-@@ -20,10 +20,11 @@ import java.sql.Timestamp
- import java.util.Collections
-
- import org.apache.spark.SparkConf
--import org.apache.spark.sql.{DataFrame, Row}
-+import org.apache.spark.sql.{DataFrame, IgnoreComet, Row}
+@@ -24,6 +24,8 @@ import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal,
TransformExpression}
import org.apache.spark.sql.catalyst.plans.physical
+import org.apache.spark.sql.comet.CometSortMergeJoinExec
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.connector.catalog.{Column, Identifier,
InMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.connector.distributions.Distributions
-@@ -32,7 +33,7 @@ import
org.apache.spark.sql.connector.expressions.Expressions._
+@@ -32,7 +34,7 @@ import
org.apache.spark.sql.connector.expressions.Expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
@@ -1486,7 +1510,15 @@ index c73e8e16fbb..88cd0d47da3 100644
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
-@@ -305,13 +306,14 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
+@@ -299,19 +301,21 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
+ Row("bbb", 20, 250.0), Row("bbb", 20, 350.0), Row("ccc", 30, 400.50)))
+ }
+
+- private def collectAllShuffles(plan: SparkPlan): Seq[ShuffleExchangeExec] =
{
++ private def collectAllShuffles(plan: SparkPlan): Seq[ShuffleExchangeLike] =
{
+ collect(plan) {
+ case s: ShuffleExchangeExec => s
++ case c: CometShuffleExchangeExec => c
}
}
@@ -1503,17 +1535,6 @@ index c73e8e16fbb..88cd0d47da3 100644
})
}
-@@ -370,7 +372,9 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
- checkAnswer(df.sort("res"), Seq(Row(10.0), Row(15.5), Row(41.0)))
- }
-
-- test("SPARK-48655: order by on partition keys should not introduce
additional shuffle") {
-+ test("SPARK-48655: order by on partition keys should not introduce
additional shuffle",
-+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+ "https://github.com/apache/datafusion-comet/issues/1948")) {
- val items_partitions = Array(identity("price"), identity("id"))
- createTable(items, itemsColumns, items_partitions)
- sql(s"INSERT INTO testcat.ns.$items VALUES " +
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index f62e092138a..c0404bfe85e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -1583,17 +1604,14 @@ index 418ca3430bb..eb8267192f8 100644
withTempPath { path =>
val dir = path.getCanonicalPath
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
-index d1b11a74cf3..08087c80201 100644
+index d1b11a74cf3..75e4600863a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
-@@ -17,8 +17,9 @@
-
- package org.apache.spark.sql.execution
+@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
--import org.apache.spark.sql.{Dataset, QueryTest}
-+import org.apache.spark.sql.{Dataset, IgnoreComet, QueryTest}
+ import org.apache.spark.sql.{Dataset, QueryTest}
import org.apache.spark.sql.IntegratedUDFTestUtils._
-+import org.apache.spark.sql.comet.CometCollectLimitExec
++import org.apache.spark.sql.comet.{CometCollectLimitExec,
CometGlobalLimitExec, CometProjectExec, CometSortExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.rand
import org.apache.spark.sql.internal.SQLConf
@@ -1606,39 +1624,26 @@ index d1b11a74cf3..08087c80201 100644
case _ => false
}.isDefined)
}
-@@ -77,7 +78,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
- assert(!hasLocalSort(physicalPlan))
+@@ -47,6 +48,7 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
+ private def assertHasGlobalLimitExec(plan: SparkPlan): Unit = {
+ assert(find(plan) {
+ case _: GlobalLimitExec => true
++ case _: CometGlobalLimitExec => true
+ case _ => false
+ }.isDefined)
}
-
-- test("root LIMIT preserves data ordering with CollectLimitExec") {
-+ test("root LIMIT preserves data ordering with CollectLimitExec",
-+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+ "https://github.com/apache/datafusion-comet/issues/1948")) {
- withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
- val df = spark.range(10).orderBy($"id" % 8).limit(2)
- df.collect()
-@@ -88,7 +91,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
- }
+@@ -55,6 +57,11 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
+ find(plan) {
+ case GlobalLimitExec(_, s: SortExec, _) => !s.global
+ case GlobalLimitExec(_, ProjectExec(_, s: SortExec), _) => !s.global
++ case CometGlobalLimitExec(_, _, _, _, s: CometSortExec, _) =>
++ !s.originalPlan.asInstanceOf[SortExec].global
++ case CometGlobalLimitExec(_, _, _, _,
++ CometProjectExec(_, _, _, _, s: CometSortExec, _), _) =>
++ !s.originalPlan.asInstanceOf[SortExec].global
+ case _ => false
+ }.isDefined
}
-
-- test("middle LIMIT preserves data ordering with the extra sort") {
-+ test("middle LIMIT preserves data ordering with the extra sort",
-+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+ "https://github.com/apache/datafusion-comet/issues/1948")) {
- withSQLConf(
- SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1",
- // To trigger the bug, we have to disable the coalescing optimization.
Otherwise we use only
-@@ -117,7 +122,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
- assert(!hasLocalSort(physicalPlan))
- }
-
-- test("middle OFFSET preserves data ordering with the extra sort") {
-+ test("middle OFFSET preserves data ordering with the extra sort",
-+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+ "https://github.com/apache/datafusion-comet/issues/1948")) {
- val df = 1.to(10).map(v => v -> v).toDF("c1", "c2").orderBy($"c1" % 8)
- verifySortAdded(df.offset(2))
- verifySortAdded(df.filter($"c2" > rand()).offset(2))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
index 743ec41dbe7..9f30d6c8e04 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
@@ -3490,7 +3495,7 @@ index 86c4e49f6f6..2e639e5f38d 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index f0f3f94b811..d64e4e54e22 100644
+index f0f3f94b811..c9d0ecfec41 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
@@ -3547,7 +3552,7 @@ index f0f3f94b811..d64e4e54e22 100644
}
}
-@@ -248,8 +271,33 @@ private[sql] trait SQLTestUtilsBase
+@@ -248,8 +271,24 @@ private[sql] trait SQLTestUtilsBase
override protected def converter: ColumnNodeToExpressionConverter =
self.spark.converter
}
@@ -3557,15 +3562,6 @@ index f0f3f94b811..d64e4e54e22 100644
+ protected def isCometEnabled: Boolean = SparkSession.isCometEnabled
+
+ /**
-+ * Whether to enable ansi mode This is only effective when
-+ * [[isCometEnabled]] returns true.
-+ */
-+ protected def enableCometAnsiMode: Boolean = {
-+ val v = System.getenv("ENABLE_COMET_ANSI_MODE")
-+ v != null && v.toBoolean
-+ }
-+
-+ /**
+ * Whether Spark should only apply Comet scan optimization. This is only
effective when
+ * [[isCometEnabled]] returns true.
+ */
@@ -3581,7 +3577,7 @@ index f0f3f94b811..d64e4e54e22 100644
super.withSQLConf(pairs: _*)(f)
}
-@@ -451,6 +499,8 @@ private[sql] trait SQLTestUtilsBase
+@@ -451,6 +490,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
@@ -3591,10 +3587,10 @@ index f0f3f94b811..d64e4e54e22 100644
spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
-index 245219c1756..7d2ef1b9145 100644
+index 245219c1756..a611836f086 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
-@@ -75,6 +75,31 @@ trait SharedSparkSessionBase
+@@ -75,6 +75,27 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules
such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
@@ -3618,10 +3614,6 @@ index 245219c1756..7d2ef1b9145 100644
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
-+ if (enableCometAnsiMode) {
-+ conf
-+ .set("spark.sql.ansi.enabled", "true")
-+ }
+ }
conf.set(
StaticSQLConf.WAREHOUSE_PATH,
@@ -3662,19 +3654,10 @@ index 52abd248f3a..7a199931a08 100644
case d: DynamicPruningExpression => d.child
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala
-index 4b27082e188..09f591dfed3 100644
+index 4b27082e188..6710c90c789 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala
-@@ -17,7 +17,7 @@
-
- package org.apache.spark.sql.hive
-
--import org.apache.spark.sql.{QueryTest, Row}
-+import org.apache.spark.sql.{IgnoreComet, QueryTest, Row}
- import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
- import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
- import org.apache.spark.sql.hive.test.TestHiveSingleton
-@@ -147,11 +147,15 @@ class HiveUDFDynamicLoadSuite extends QueryTest with
SQLTestUtils with TestHiveS
+@@ -147,7 +147,9 @@ class HiveUDFDynamicLoadSuite extends QueryTest with
SQLTestUtils with TestHiveS
// This jar file should not be placed to the classpath.
val jarPath = "src/test/noclasspath/hive-test-udfs.jar"
@@ -3685,13 +3668,6 @@ index 4b27082e188..09f591dfed3 100644
val jarUrl = s"file://${System.getProperty("user.dir")}/$jarPath"
test("Spark should be able to run Hive UDF using jar regardless of " +
-- s"current thread context classloader (${udfInfo.identifier}") {
-+ s"current thread context classloader (${udfInfo.identifier}",
-+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
-+ "https://github.com/apache/datafusion-comet/issues/1948")) {
- Utils.withContextClassLoader(Utils.getSparkClassLoader) {
- withUserDefinedFunction(udfInfo.funcName -> false) {
- val sparkClassLoader = Thread.currentThread().getContextClassLoader
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index cc7bb193731..06555d48da7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -3729,10 +3705,10 @@ index b67370f6eb9..746b3974b29 100644
override def beforeEach(): Unit = {
super.beforeEach()
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
-index a394d0b7393..d29b3058897 100644
+index a394d0b7393..a4bc3d3fd8e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
-@@ -53,24 +53,47 @@ object TestHive
+@@ -53,24 +53,41 @@ object TestHive
new SparkContext(
System.getProperty("spark.sql.test.master", "local[1]"),
"TestSQLContext",
@@ -3784,12 +3760,6 @@ index a394d0b7393..d29b3058897 100644
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
-+
-+ val a = System.getenv("ENABLE_COMET_ANSI_MODE")
-+ if (a != null && a.toBoolean) {
-+ conf
-+ .set("spark.sql.ansi.enabled", "true")
-+ }
+ }
+
+ conf
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]