mbutrovich commented on issue #4189: URL: https://github.com/apache/datafusion-comet/issues/4189#issuecomment-4373293771
Investigating with Claude: Thanks for the write-up. I traced the field-ID handling across our scan paths to make sure I understood the scope before responding. ## Field-ID handling today - **Legacy V1 (`CometParquetFileFormat`)**: honors field IDs via [`CometParquetReadSupport`](https://github.com/apache/datafusion-comet/blob/main/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala) (see `useFieldId` at lines 126, 203, 245, 351). - **`SCAN_NATIVE_DATAFUSION`**: gate at [`CometScanRule.scala#L235-L239`](https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala#L235-L239) falls back to Spark when `spark.sql.parquet.fieldId.read.enabled=true` and `requiredSchema` has IDs. - **`SCAN_NATIVE_ICEBERG_COMPAT`**: also honors field IDs. The relation's `fileFormat` is swapped to `CometParquetFileFormat` in [`CometScanExec.scala#L558`](https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala#L558), and `useFieldId` is plumbed into `NativeBatchReader` at [`CometParquetFileFormat.scala#L97,L158`](https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala#L97). The reader rewrites the Spark schema by ID at [`NativeBatchReader.java#L398-L401`](https://github.com/apache/datafusion-comet/blob/main/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java#L398-L401) (`getSparkSchemaByFieldId`, derived from `CometParquetReadSupport.matchFieldId`). - **`SCAN_NATIVE_DELTA_COMPAT` (PR #3932)**: this is the actual gap. The mirrored gate never fires because Delta strips field-ID metadata from `requiredSchema` before it reaches Comet. ## Test coverage No field-ID exclusions in `dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.1}.diff`, so Spark's `ParquetFieldIdIOSuite` and `ParquetFieldIdSchemaSuite` run unmodified. They pass via the legacy V1 reader or the native_datafusion fallback. The Comet-side test [`ParquetReadSuite.scala#L1356`](https://github.com/apache/datafusion-comet/blob/main/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala#L1356) writes and reads with matching names, so it would pass even without by-ID matching and does not protect against the rename-detection scenario. ## Suggested fix for `SCAN_NATIVE_DATAFUSION` Rather than just plugging the Delta-specific gate, we could honor field IDs end-to-end in the native path and drop the fallback. arrow-rs only round-trips the metadata; it does not resolve columns by ID: - `PARQUET:field_id` is copied from parquet→Arrow field metadata when reading the footer ([`parquet/src/arrow/schema/complex.rs#L766-L770`](https://github.com/apache/arrow-rs/blob/main/parquet/src/arrow/schema/complex.rs#L766), constant at [`mod.rs#L227`](https://github.com/apache/arrow-rs/blob/main/parquet/src/arrow/mod.rs#L227)) and read back when writing. - `ProjectionMask::leaves(parquet_schema, indices)` projects by physical leaf index. `ProjectionMask::columns(...)` and `ArrowReaderOptions::with_schema(...)` are both name/position-based - no ID resolution. iceberg-rust builds ID-based projection on top of those primitives in [`reader/projection.rs#L144-L213`](https://github.com/apache/iceberg-rust/blob/main/crates/iceberg/src/arrow/reader/projection.rs#L144): walk the file's Arrow schema, filter leaves whose `PARQUET:field_id` metadata matches a requested ID, build a `ProjectionMask::leaves(...)` from the matched indices, and let a downstream transformer reorder/rename to the requested schema. Comet's native side already understands `PARQUET:field_id` in [`native/core/src/parquet/cast_column.rs#L572-L575`](https://github.com/apache/datafusion-comet/blob/main/native/core/src/parquet/cast_column.rs#L572), so the post-projection rename/cast layer largely works. Concrete hookup: 1. JVM side: when serializing the scan, attach the Spark field IDs as proto metadata (we already pass `parquet.field.id` through for the Iceberg path). 2. Native side in [`native/core/src/parquet/parquet_exec.rs`](https://github.com/apache/datafusion-comet/blob/main/native/core/src/parquet/parquet_exec.rs): when `useFieldId` is set and the required schema carries IDs, build the projection from the file's footer using leaf field IDs (mirroring iceberg-rust's `get_arrow_projection_mask_with_field_ids`) instead of by name. Net: one new helper in the Rust scan path plus ID propagation in the proto. No JVM-side footer reads, no arrow-rs work, no per-file schema rewrites at planning time. This also fixes the Delta gap reported here as a side effect, since correctness no longer depends on the JVM-side gate seeing IDs on `requiredSchema`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
