adriangb commented on issue #21968:
URL: https://github.com/apache/datafusion/issues/21968#issuecomment-4364104276

   > _Exploration was human-directed; the writeup below is AI-generated.
   > Driver: @adriangb. The accompanying changes live in two draft PRs
   > (not meant to land as-is — purely for visibility while the
   > investigation continues):_
   >
   > - DataFusion: https://github.com/apache/datafusion/pull/21987
   > - arrow-rs: https://github.com/apache/arrow-rs/pull/9882
   
   # Wide-schema parquet read perf — investigation
   
   ## Setup
   
   I generated the wide-schema benchmark suite (now committed on the
   draft branch as `benchmarks/sql_benchmarks/wide_schema/`): two copies
   of the same 256 parquet files / 50 k rows each — one with **8
   columns**, one with **1024 columns**. Identical row counts, identical
   file counts, identical bytes scanned. Q04 is the most stressful:
   
   ```sql
   SELECT id, ts FROM events
   WHERE category = 'c0' AND flag = 'f0' AND id % 1000 = 0;
   ```
   
   Profiled with samply, all hotspots resolved with `--unstable-presymbolicate`.
   
   ## Worst offenders identified
   
   1. **Default 50 MB metadata cache thrashes badly.** Per-file
      `ParquetMetaData` is ~1.5 MB (page index dominates), dataset total
      ~400 MB. The default fits ~30 of 256 files — *enough* to make
      threads contend on the cache mutex, *not enough* to stop
      re-parsing. Cache-size sweep showed 50 MB is the **worst** regime:
      disabling the cache (0) is faster (~100 ms hot vs ~136 ms hot at
      50 MB) because the lock contention disappears. Sizing at 400 MB+
      (everything fits) doubles hot performance to ~55 ms.
   
   2. **`statistics_from_parquet_metadata` was O(N²) per file.** It
      iterated every logical field and called `StatisticsConverter::try_new`,
      which itself did an O(N) scan in `parquet_column` (the comment
      said _"this could be made more efficient (#TBD)"_). 1024² × 256 ≈
      268 M ops/query just to set up stats.
   
   3. **`ArrowReaderMetadata::try_new` runs per file open.** Walks every
      parquet leaf to build the arrow `Schema` + dremel `ParquetField`.
      ~190 µs/file at 1024 cols. With 256 files / 12 threads = ~4 ms
      wall per query.
   
   4. **Coercion forces a second arrow-schema rebuild.** With
      `schema_force_view_types=true` (default), 
`apply_file_schema_type_coercions`
      returns a new schema ⇒ `try_new` runs *again* per file. Another
      ~190 µs/file.
   
   5. **`apply_file_schema_type_coercions` always built a 1024-entry
      HashMap** even when the early-return condition would discard it.
   
   6. **`DefaultFilesMetadataCache::evict_entries`** called full
      structural `memory_size()` walks on every put / remove / eviction.
   
   7. **`collect_statistics=true` (default) is the cold-path tax.**
      Eagerly fetches metadata + computes per-column stats for every
      file at planning time, even when the query touches only 4 of 1024
      columns. For Q04 the inferred stats are then unused
      (`files_ranges_pruned_statistics: 267 → 267`).
   
   ## Fixes shipped (on the draft branches)
   
   **arrow-rs ([#9882](https://github.com/apache/arrow-rs/pull/9882))**
   - `SchemaDescriptor::root_to_first_leaf`: precomputed at construction
     ⇒ `parquet_column` is now O(1).
   - `StatisticsConverter::from_arrow_field`: low-overhead constructor
     taking a resolved `(field, leaf_idx)` pair.
   - `ArrowReaderMetadata::from_field_levels`: package precomputed parts
     so callers can skip the per-leaf walk.
   - `parquet_to_arrow_schema_and_field_levels`: produces `(Schema,
     FieldLevels)` in one walk.
   - `AsyncFileReader::get_arrow_reader_metadata`: new trait method (default
     delegates to `try_new`) so cache-aware readers can short-circuit.
     `load_async` now goes through it.
   - Public accessors on `ArrowReaderOptions` so callers can decide
     whether their cached arrow view applies.
   
   **DataFusion ([#21987](https://github.com/apache/datafusion/pull/21987))**
   - `statistics_from_parquet_metadata`: precompute logical → parquet
     leaf indices once (O(N)) and use `from_arrow_field` in the loop.
     O(N²)/file → O(N)/file.
   - `CachedParquetMetaData` carries a `OnceLock<ArrowReaderMetadata>`
     (cache hit ⇒ `Arc::clone`, ~4 ns) plus a single-slot
     `Mutex<Option<(supplied_schema_ptr, ArrowReaderMetadata)>>` for the
     post-coercion build.
   - `CachedParquetFileReader::get_arrow_reader_metadata` overrides the
     new trait method and serves both base and post-coercion ARMs from
     cache.
   - `prepare_filters` made async so the post-coercion rebuild also
     routes through the cache-aware reader.
   - `apply_file_schema_type_coercions`: cheap "any view/string?" first
     pass (only build the lookup HashMap when needed); return `None` when
     nothing actually changed.
   - `DefaultFilesMetadataCache` stores `memory_size` next to each entry —
     no more re-walks on put / evict / remove.
   - New `wide_schema_microbench` covering `try_new` vs `clone_cached`,
     `apply_file_schema_type_coercions` no-op, `PruningPredicate::try_new`,
     `StatisticsConverter::try_new` vs `from_arrow_field`.
   
   ## Results (Q04, 1024 cols × 256 files, profiling build)
   
   | Scenario | Before | After | Δ |
   |---|---|---|---|
   | narrow control, hot | ~25 ms | ~24 ms | ~0 |
   | **wide @50 M cache (default), cold** | ~1010 ms | **~615 ms** | **−39%** |
   | **wide @50 M cache (default), hot** | ~108 ms | **~87 ms** | **−19%** |
   | wide @50 M, `collect_statistics=false`, **cold** | ~1010 ms | **~257 ms** 
| **−75%** |
   
   For reference (warm-fitting cache):
   - wide @2 G cold: 830 → 510 ms (−39%)
   - wide @2 G hot: 47 → 38 ms (−19%)
   
   ## Microbench numbers (from the new `wide_schema_microbench`)
   
   | Cols | `ArrowReaderMetadata::try_new` | `clone_cached` | Speedup |
   |---|---|---|---|
   | 8 | 2.0 µs | 4.4 ns | 450× |
   | 64 | 12 µs | 4.4 ns | 2700× |
   | 256 | 47 µs | 4.4 ns | 10700× |
   | 1024 | 190 µs | 4.4 ns | 43000× |
   
   `try_new` is O(N) at ~190 ns/col; the cached clone path is O(1).
   `PruningPredicate::try_new` against a 1-column predicate is already
   ~2-3 µs and **does not scale with schema width** — predicate-driven,
   not schema-driven, so that part is already where we want it.
   
   ## Open / next
   
   - The `collect_statistics=true` default eats most of the cold-path
     budget. The structural fix is **lazy / projection-aware
     `Statistics::column_statistics`** so the optimizer only pays for
     the columns it actually inspects. That'd let the default stay safe
     without paying for unused columns.
   - The page-index-skip experiment helped cold (−12-17%) but regressed
     warm because `load_page_index` doesn't update the cache. The fix
     is to also persist page-index-loaded metadata back into the cache.
   - `Schema::index_of` / `Fields::find` are O(N) linear scans in arrow.
     Profile shows them at <1% on Q04 — small now, but they're in
     enough hot loops that adding a lazy name-index hashmap to `Fields`
     would future-proof anything that does name lookups in inner loops.
   
   Full investigation log lives at `report.md` on the DataFusion branch
   ([direct 
link](https://github.com/pydantic/datafusion/blob/adrian/wide-schema-perf/report.md)).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to