pepijnve commented on code in PR #740: URL: https://github.com/apache/arrow-site/pull/740#discussion_r2599319698
########## _posts/2025-12-07-parquet-late-materialization-deep-dive.md: ########## @@ -0,0 +1,311 @@ +--- +layout: post +title: "A Practical Dive Into Late Materialization in arrow-rs Parquet Reads" +description: "How arrow-rs pipelines predicates and projections to minimize work during Parquet scans" +date: "2025-12-07 00:00:00" +author: "<a href=\"https://github.com/hhhizzz\">Huang Qiwei</a> and <a href=\"https://github.com/alamb\">Andrew Lamb</a>" +categories: [application] +translations: + - language: 简体中文 + post_id: 2025-12-07-parquet-late-materialization-deep-dive-zh +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +This article dives into the decisions and pitfalls of implementing Late Materialization in the [Apache Parquet] reader from [`arrow-rs`] (the reader powering [Apache DataFusion] among other projects). We'll see how a seemingly humble file reader requires complex logic to evaluate predicates—effectively becoming a **tiny query engine** in its own right. + +[Apache Parquet]: https://parquet.apache.org/ +[Apache DataFusion]: https://datafusion.apache.org/ +[`arrow-rs`]: https://github.com/apache/arrow-rs + +## 1. Why Late Materialization? + +Columnar reads are a constant battle between **I/O bandwidth** and **CPU decode costs**. While skipping data is generally good, the act of skipping itself carries a computational cost. The goal of the Parquet reader in `arrow-rs` is **pipeline-style late materialization**: evaluate predicates first, then access projected columns. For predicates that filter many rows, materializing after evaluation minimizes reads and decode work. + +The approach closely mirrors the **LM-pipelined** strategy from [Materialization Strategies in a Column-Oriented DBMS](https://www.cs.umd.edu/~abadi/papers/abadiicde2007.pdf) by Abadi et al.: interleaving predicates and data column access instead of reading all columns at once and trying to **stitch them back together** into rows. + +<figure style="text-align: center;"> + <img src="{{ site.baseurl }}/img/late-materialization/fig1.png" alt="LM-pipelined late materialization pipeline" width="100%" class="img-responsive"> +</figure> + +To evaluate a query like `SELECT B, C FROM table WHERE A > 10 AND B < 5` using late materialization, the reader follows these steps: + +1. Read column `A` and evaluate `A > 10` to build a `RowSelection` (a sparse mask) representing the initial set of surviving rows. +2. Use that `RowSelection` to read surviving values of column `B` and evaluate `B < 5` and update the `RowSelection` to make it even sparser. +3. Use the refined `RowSelection` to read column `C` (a projection column), decoding only the final surviving rows. + +The rest of this post zooms in on how the code makes this path work. + +--- + +## 2. Late Materialization in the Rust Parquet Reader + +### 2.1 LM-pipelined + +"LM-pipelined" might sound like something from a textbook. In `arrow-rs`, it simply refers to a pipeline that runs sequentially: "read predicate column → generate row selection → read data column". This contrasts with a **parallel** strategy, where all predicate columns are read simultaneously. While parallelism can maximize multi-core CPU usage, the pipelined approach is often superior in columnar storage because each filtering step drastically reduces the amount of data subsequent steps need to read and parse. + +The code is structured into a few core roles: + +- **[ReadPlan](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/read_plan.rs#L302) / [ReadPlanBuilder](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/read_plan.rs#L34)**: Encodes "which columns to read and with what row subset" into a plan. It does not pre-read all predicate columns. It reads one, tightens the selection, and then moves on. +- **[RowSelection](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/selection.rs#L139)**: Two implementations: use [Run-length encoding] (RLE) (via [`RowSelector`]) to "skip/select N rows", or use an Arrow [`BooleanBuffer`] bitmask to filter rows. This is the core mechanism that carries sparsity through the pipeline. +- **[ArrayReader](https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/array_reader/mod.rs#L85)**: Responsible for decoding. It receives a [`RowSelection`] and decides which pages to read and which values to decode. + +[Run-length encoding]: https://en.wikipedia.org/wiki/Run-length_encoding +[`RowSelector`]: https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/selection.rs#L66 +[`BooleanBuffer`]: https://github.com/apache/arrow-rs/blob/a67cd19fff65b6c995be9a5eae56845157d95301/arrow-buffer/src/buffer/boolean.rs#L37 + +[`RowSelection`] can switch dynamically between RLE and bitmasks. Bitmasks are faster when gaps are tiny and sparsity is high; RLE is friendlier to large, page-level skips. Details on this trade-off appear in section 3.1. + +Consider again the query: `SELECT B, C FROM table WHERE A > 10 AND B < 5`: + +1. **Initial**: `selection = None` (equivalent to "select all"). +2. **Read A**: `ArrayReader` decodes column A in batches; the predicate builds a boolean mask; [`RowSelection::from_filters`] turns it into a sparse selection. +3. **Tighten**: [`ReadPlanBuilder::with_predicate`] chains the new mask via [`RowSelection::and_then`]. +4. **Read B**: Build column B's reader with the current `selection`; the reader only performs I/O and decoding for selected rows, producing an even sparser mask. +5. **Merge**: `selection = selection.and_then(selection_b)`; projection columns now decode a tiny row set. + +[`RowSelection::from_filters`]: https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/selection.rs#L149 +[`ReadPlanBuilder::with_predicate`]: https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/read_plan.rs#L143 +[`RowSelection::and_then`]: https://github.com/apache/arrow-rs/blob/bab30ae3d61509aa8c73db33010844d440226af2/parquet/src/arrow/arrow_reader/selection.rs#L345 + +**Code locations and sketch**: + +```rust +// Close to the flow in read_plan.rs (simplified) +let mut builder = ReadPlanBuilder::new(batch_size); + +// 1) Inject external pruning (e.g., Page Index): +builder = builder.with_selection(page_index_selection); + +// 2) Append predicates serially: +for predicate in predicates { + builder = builder.with_predicate(predicate); // internally uses RowSelection::and_then +} + +// 3) Build readers; all ArrayReaders share the final selection strategy +let plan = builder.build(); +let reader = ParquetRecordBatchReader::new(array_reader, plan); +``` + +I've drawn a simple flowchart that illustrates this flow to help you understand: + +<figure style="text-align: center;"> + <img src="{{ site.baseurl }}/img/late-materialization/fig2.jpg" alt="Predicate-first pipeline flow" width="100%" class="img-responsive"> +</figure> + +Now that you understand how this pipeline works, the next question is **how to represent and combine these sparse selections** (the **Row Mask** in the diagram), which is where `RowSelection` comes in. + +### 2.2 Combining row selectors (`RowSelection::and_then`) + +[`RowSelection`] represents the set of rows that will eventually be produced. It currently uses RLE (`RowSelector::select/skip(len)`) to describe sparse ranges. [`RowSelection::and_then`] is the core operator for "apply one selection to another": the left-hand argument is "rows already passed" and the right-hand argument is "which of the passed rows also pass the second filter." The output is their boolean AND. + +[`RowSelection`]: https://github.com/apache/arrow-rs/blob/ce4edd53203eb4bca96c10ebf3d2118299dad006/parquet/src/arrow/arrow_reader/selection.rs#L139 +[`RowSelection::and_then`]: https://github.com/apache/arrow-rs/blob/ce4edd53203eb4bca96c10ebf3d2118299dad006/parquet/src/arrow/arrow_reader/selection.rs#L345 + +**Walkthrough Example**: + +* **Input Selection A (already filtered)**: `[Skip 100, Select 50, Skip 50]` (physical rows 100-150 are selected) +* **Selection B (filters within A)**: `[Select 10, Skip 40]` (within the 50 selected rows, only the first 10 survive B) +* **Result**: `[Skip 100, Select 10, Skip 90]`. + +**How it runs**: +Think of it like a zipper: we traverse both lists simultaneously, as shown below: + +1. **First 100 rows**: A is Skip → result is Skip 100. +2. **Next 50 rows**: A is Select. Look at B: + * B's first 10 are Select → result Select 10. + * B's remaining 40 are Skip → result Skip 40. +3. **Final 50 rows**: A is Skip → result Skip 50. + +**Result**: `[Skip 100, Select 10, Skip 90]`. + +Here is an example in code: + +```rust +// Example: Skip 100 rows, then take the next 10 +let a: RowSelection = vec![RowSelector::skip(100), RowSelector::select(50)].into(); +let b: RowSelection = vec![RowSelector::select(10), RowSelector::skip(40)].into(); +let result = a.and_then(&b); +// Result should be: Skip 100, Select 10, Skip 40 +assert_eq!( + Vec::<RowSelector>::from(result), + vec![RowSelector::skip(100), RowSelector::select(10), RowSelector::skip(40)] +); +``` + +This keeps narrowing the filter while touching only lightweight metadata—no data copies. The implementation is a two-pointer linear scan; complexity is linear in selector segments. The sooner predicates shrink the selection, the cheaper later scans become. Review Comment: > complexity is linear in selector segments Add 'the number of'? > The sooner predicates shrink the selection, the cheaper later scans become. I had to read this sentence twice. Maybe write this as > The _more_ predicates shrink the selection, the cheaper later scans become. At least that's my interpretion, that you're trying to say "higher selectivity is better" rather then "earlier filtering is better". Actually you want both of course 😄 In an ideal world the most selective predicates are done first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
