mbutrovich commented on PR #22026: URL: https://github.com/apache/datafusion/pull/22026#issuecomment-4388122606
> @mbutrovich from high level perspective how `row_number` virtual column would work when reading multiple parquet files? Thanks for the question @comphead! `row_number` is **per-file**, not globally monotonic across a multi-file scan. Each opened file independently starts at 0. This matches Spark's `_metadata.row_index` / `_tmp_metadata_row_index` exactly, so Comet's consumer gets identical semantics. **arrow-rs** ([`row_number.rs`](https://github.com/apache/arrow-rs/blob/main/parquet/src/arrow/array_reader/row_number.rs)): `RowNumberReader::try_new` builds offsets by summing `rg.num_rows()` across **the file's** row groups starting at 0. Offsets of surviving row groups are absolute within the file regardless of pruning. **Spark** ([`ParquetRowIndexUtil.scala`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala)): the generator seeds from `PageReadStore.getRowIndexOffset` (file-relative, includes skipped rows) and counts within the file. Non-vectorized path uses `ParquetRecordReader.getCurrentRowIndex` — same semantics. **Concrete scenarios** | Scenario | Values produced | |---|---| | Single file, 300 rows, 3 row groups, no pruning | `0..300` | | Single file, middle row group stats-pruned | `0..100 ++ 200..300` (pinned by `test_row_index_with_row_group_skip`) | | Two files A (100 rows) + B (100 rows) in one scan | A → `0..100`, B → `0..100` (values collide across files) | | Partitioned table, multiple files per partition | Each file independently `0..N`; collisions across files | Because values collide across files, callers that need a globally-unique row id pair `row_number` with `input_file_name()` (or the `_metadata.file_path` struct in Spark). Comet's `_tmp_metadata_row_index` follows this same convention, which is why the unblock in apache/datafusion-comet#3432 only needs per-file semantics. This is also the only semantics that makes sense at the `ParquetOpener` layer — the opener is invoked per file and has no view of the other files in the scan. A scan-global numbering would require the executor to know every file's row count up front, which defeats streaming/parallelism. A higher-level (`ListingTable` / SQL-layer) surface could layer scan-global numbering on top if ever wanted, but that's out of scope here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
