mbutrovich commented on PR #4229:
URL: 
https://github.com/apache/datafusion-comet/pull/4229#issuecomment-4391527687

   Nice reduction in ignored tests. One concern on scope.
   
   The three-case match in `replace_with_spark_cast` exactly mirrors the three 
Comet-specific permissive branches in Comet's `TypeUtil.checkParquetType` 
(`INT32→Long` and `FLOAT→Double` gated on `allowTypePromotion`, `INT32→Double` 
gated on `isSpark40Plus`), so `native_datafusion` and `native_iceberg_compat` 
now line up for those three. Good.
   
   But `native_datafusion` also silently accepts a bunch of conversions that 
**Spark's vectorized reader rejects on every supported version** (Spark's 
`ParquetVectorUpdaterFactory.getUpdater` falls through to 
`constructConvertNotSupportedException`). From Spark's 
`ParquetTypeWideningSuite` `expectError = true` list on 4.0.2: `Long→Int`, 
`Double→Float`, `Float→Long`, `Long→Double`, `Int→Float`, `Int→TimestampType`, 
`Date→TimestampType`. None of these is gated by the new flag, and none of them 
throws today.
   
   I ran a probe against this PR on Spark 3.5 with 
`COMET_NATIVE_SCAN_IMPL=native_datafusion`, sweeping 
`spark.comet.schemaEvolution.enabled` on and off. Results:
   
   | Case | Written | Spark ref behavior | schemaEvolution=false | 
schemaEvolution=true |
   |---|---|---|---|---|
   | `int→long` | `[1, 2, 3]` | throws (3.x) / ok (4.0) | **throws** ✅ | `[1, 
2, 3]` |
   | `float→double` | `[1.0, 2.0, 3.0]` | throws (3.x) / ok (4.0) | **throws** 
✅ | `[1.0, 2.0, 3.0]` |
   | `int→double` | `[1, 2, 3]` | throws (3.x) / ok (4.0) | **throws** ✅ | 
`[1.0, 2.0, 3.0]` |
   | `long→int` (narrowing) | `[1, 2, 3, 2147483652]` | throws | `[1, 2, 3, 
-2147483644]` | `[1, 2, 3, -2147483644]` |
   | `double→float` (narrowing) | `[1.5, 2.5, 1e40]` | throws | `[1.5, 2.5, 
Infinity]` | `[1.5, 2.5, Infinity]` |
   | `float→long` | `[1.5, 2.5]` | throws | `[1, 2]` (truncated) | `[1, 2]` 
(truncated) |
   | `long→double` | `[1, 2, 2^54+1]` | throws | `[1.0, 2.0, 
1.8014398509481984E16]` (lost +1) | same |
   | `int→float` | `[1, 2, 2^25+1]` | throws | `[1.0, 2.0, 3.3554432E7]` (lost 
+1) | same |
   | `int→timestamp` | `[1, 2, 3]` | throws | `[1969-12-31 16:00:01 … 03]` 
(PST, int-as-seconds) | same |
   | `double→long` | `[1.0, 2.0, 3.0]` | throws | `[1, 2, 3]` | `[1, 2, 3]` |
   
   The top three rows are what the PR fixes and look right under both settings. 
The bottom seven are wrong-answer paths under both settings: silent overflow on 
narrowing, silent precision loss on widening Spark doesn't allow, silent 
raw-int-as-epoch-seconds reinterpretation for `int→timestamp`. These are the 
same class of gap #3720 enumerates for `STRING→INT` and decimal precision 
narrowing, just for primitive-to-primitive conversions.
   
   Not asking you to fix all of them in this PR. But I think the framing in the 
commit message and code comment (`mirrors TypeUtil.checkParquetType`) 
undersells the remaining surface. Two options worth considering:
   
   1. Invert the check to an allowlist of Spark-supported `(physical, target)` 
pairs (essentially mirror the accept cases in Spark's 
`ParquetVectorUpdaterFactory.getUpdater` per Spark version), so anything else 
raises `ParquetSchemaConvert`. This closes the whole category.
   2. Land this as-is and file a followup issue tracking the seven cases above, 
linking this probe so behavior is captured.
   
   Either is fine by me. I'd lean toward (2) to keep this PR scoped.
   
   ---
   
   Probe used (slimmed, put under 
`spark/src/test/scala/org/apache/comet/parquet/`, runs with `./mvnw test 
-Pspark-3.5 -Dtest=none 
-Dsuites=org.apache.comet.parquet.TypePromotionProbeSuite 
-Dscalastyle.skip=true`):
   
   ```scala
   package org.apache.comet.parquet
   
   import scala.util.Try
   import org.apache.spark.sql.{CometTestBase, DataFrame}
   import org.apache.spark.sql.internal.SQLConf
   import org.apache.comet.CometConf
   
   class TypePromotionProbeSuite extends CometTestBase {
     import testImplicits._
   
     private def probe(label: String)(body: => Any): Unit = {
       val result = Try(body)
       // scalastyle:off println
       println(s"[PROBE] $label -> ${result match {
           case scala.util.Success(v) => s"OK value=$v"
           case scala.util.Failure(e) => s"THROW ${e.getClass.getSimpleName}"
         }}")
       // scalastyle:on println
     }
   
     private def runAll(ev: Boolean): Unit = withSQLConf(
       CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
       CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> ev.toString,
       SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
       def run(label: String, df: DataFrame, writeType: String, readAs: 
String): Unit =
         probe(s"$label (ev=$ev)") {
           withTempPath { dir =>
             df.selectExpr(s"cast(c as $writeType) as 
c").write.parquet(dir.getCanonicalPath)
             spark.read.schema(s"c $readAs").parquet(dir.getCanonicalPath)
               .collect().map(_.get(0)).toSeq
           }
         }
       run("int->long",              Seq(1, 2, 3).toDF("c"),                    
          "int",    "bigint")
       run("float->double",          Seq(1.0f, 2.0f, 3.0f).toDF("c"),           
          "float",  "double")
       run("int->double",            Seq(1, 2, 3).toDF("c"),                    
          "int",    "double")
       run("long->int narrowing",    Seq(1L, 2L, 3L, Int.MaxValue.toLong + 
5L).toDF("c"), "bigint", "int")
       run("double->float narrowing",Seq(1.5, 2.5, 1e40).toDF("c"),             
          "double", "float")
       run("float->long",            Seq(1.5f, 2.5f).toDF("c"),                 
          "float",  "bigint")
       run("long->double",           Seq(1L, 2L, (1L << 54) + 1L).toDF("c"),    
          "bigint", "double")
       run("int->float",             Seq(1, 2, (1 << 25) + 1).toDF("c"),        
          "int",    "float")
       run("int->timestamp",         Seq(1, 2, 3).toDF("c"),                    
          "int",    "timestamp")
       run("double->long",           Seq(1.0, 2.0, 3.0).toDF("c"),              
          "double", "bigint")
     }
   
     test("probe ev=false") { runAll(ev = false) }
     test("probe ev=true")  { runAll(ev = true) }
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to