This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new f6d84b138 fix: fall back from native_datafusion for duplicate fields 
in case-insensitive mode (#3687)
f6d84b138 is described below

commit f6d84b138edb9f0c7db820c9200aff79d40fb276
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 16 19:40:26 2026 -0600

    fix: fall back from native_datafusion for duplicate fields in 
case-insensitive mode (#3687)
---
 dev/diffs/3.5.8.diff                               | 35 +++++++++++++---------
 docs/source/contributor-guide/parquet_scans.md     |  4 +++
 .../org/apache/comet/rules/CometScanRule.scala     | 16 ++++++++++
 3 files changed, 41 insertions(+), 14 deletions(-)

diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index 568b33e75..3ed7d9ce1 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -1986,7 +1986,7 @@ index 07e2849ce6f..3e73645b638 100644
        ParquetOutputFormat.WRITER_VERSION -> 
ParquetProperties.WriterVersion.PARQUET_2_0.toString
      )
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
-index 8e88049f51e..49f2001dc6b 100644
+index 8e88049f51e..6150a556f9b 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 @@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
@@ -2075,17 +2075,24 @@ index 8e88049f51e..49f2001dc6b 100644
      val schema = StructType(Seq(
        StructField("a", IntegerType, nullable = false)
      ))
-@@ -1933,7 +1949,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
-     }
-   }
+@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
+           val e = intercept[SparkException] {
+             sql(s"select a from $tableName where b > 0").collect()
+           }
+-          assert(e.getCause.isInstanceOf[RuntimeException] && 
e.getCause.getMessage.contains(
+-            """Found duplicate field(s) "B": [B, b] in case-insensitive 
mode"""))
++          assert(e.getCause.isInstanceOf[RuntimeException])
++          val msg = e.getCause.getMessage
++          // native_datafusion produces a different error message for 
duplicate fields
++          assert(
++            msg.contains(
++              """Found duplicate field(s) "B": [B, b] in case-insensitive 
mode""") ||
++              msg.contains("Unable to get field named"),
++            s"Unexpected error message: $msg")
+         }
  
--  test("SPARK-25207: exception when duplicate fields in case-insensitive 
mode") {
-+  test("SPARK-25207: exception when duplicate fields in case-insensitive 
mode",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311";))
 {
-     withTempPath { dir =>
-       val count = 10
-       val tableName = "spark_25207"
-@@ -1984,7 +2001,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+         withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2095,7 +2102,7 @@ index 8e88049f51e..49f2001dc6b 100644
      // block 1:
      //                      null count  min                                   
    max
      // page-0                         0  0                                    
     99
-@@ -2044,7 +2062,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
+@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
      }
    }
  
@@ -2105,7 +2112,7 @@ index 8e88049f51e..49f2001dc6b 100644
      withTempPath { dir =>
        val path = dir.getCanonicalPath
        spark.range(100).selectExpr("id * 2 AS id")
-@@ -2276,7 +2295,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
+@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
            assert(pushedParquetFilters.exists(_.getClass === filterClass),
              s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
  
@@ -2118,7 +2125,7 @@ index 8e88049f51e..49f2001dc6b 100644
          } else {
            assert(selectedFilters.isEmpty, "There is filter pushed down")
          }
-@@ -2336,7 +2359,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
+@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
            assert(pushedParquetFilters.exists(_.getClass === filterClass),
              s"${pushedParquetFilters.map(_.getClass).toList} did not contain 
${filterClass}.")
  
diff --git a/docs/source/contributor-guide/parquet_scans.md 
b/docs/source/contributor-guide/parquet_scans.md
index c8e960a15..2a10bb111 100644
--- a/docs/source/contributor-guide/parquet_scans.md
+++ b/docs/source/contributor-guide/parquet_scans.md
@@ -62,6 +62,10 @@ cause Comet to fall back to Spark.
 - No support for `input_file_name()`, `input_file_block_start()`, or 
`input_file_block_length()` SQL functions.
   The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these 
functions cannot populate their values.
 - No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to 
`true`
+- No support for duplicate field names in case-insensitive mode. When the 
required or data schema contains
+  field names that differ only by case (e.g., `B` and `b`), Comet falls back 
to Spark. Note that duplicates
+  in the physical Parquet file that are not reflected in the table schema 
cannot be detected at plan time,
+  so DataFusion may produce a different error message than Spark in that case.
 
 The `native_iceberg_compat` scan has the following additional limitation that 
may produce incorrect results
 without falling back to Spark:
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index 698b68777..c004d7728 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -222,6 +222,22 @@ case class CometScanRule(session: SparkSession)
       withInfo(scanExec, "Native DataFusion scan does not support Parquet 
field ID matching")
       return None
     }
+    // Case-insensitive mode with duplicate field names produces different 
errors
+    // in DataFusion vs Spark, so fall back to avoid incompatible error 
messages
+    if (!session.sessionState.conf.caseSensitiveAnalysis) {
+      val schemas = Seq(scanExec.requiredSchema, r.dataSchema)
+      for (schema <- schemas) {
+        val fieldNames =
+          schema.fieldNames.map(_.toLowerCase(java.util.Locale.ROOT))
+        if (fieldNames.length != fieldNames.distinct.length) {
+          withInfo(
+            scanExec,
+            "Native DataFusion scan does not support " +
+              "duplicate field names in case-insensitive mode")
+          return None
+        }
+      }
+    }
     if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
       return None
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to