sdd opened a new pull request, #558:
URL: https://github.com/apache/iceberg-rust/pull/558

   This PR introduces two more advanced features that can improve performance 
when executing table reads on parquet-backed tables when using a filter 
predicate: "row group filtering" and "row selection skipping".
   
   ## Background
   When performing a predicate filtered table scan, the best performance is 
achieved by eliminating as many rows as possible without having to apply the 
filter predicate to each individual row. Applying a filter predicate to the 
data itself takes time, and there are techniques that we can employ in order to 
avoid this.
   These techniques are applied hierarchically, with the most coarse-grained 
applying first. Each step eliminates some work for the next step by excluding 
more rows from the result.
   
   At the top level, when we create the file plan for the scan, we use the 
[ManifestEvaluator](https://github.com/apache/iceberg-rust/blob/fcf14ca2373a13181d211e6f431f2f24f8bb6dd8/crates/iceberg/src/expr/visitors/manifest_evaluator.rs)
 to reject entire manifest files from the plan, based on the partition field 
summary metadata of each manifest file. Rejecting a manifest file allows us to 
filter out multiple, often many, data files, in a single step.
   
   Once we have our list of remaining selected manifest files, we load them in 
and perform the next layer of filtering. In this step, we use the 
[ExpressionEvaluator](https://github.com/apache/iceberg-rust/blob/fcf14ca2373a13181d211e6f431f2f24f8bb6dd8/crates/iceberg/src/expr/visitors/expression_evaluator.rs)
 to apply our filter predicate to the partition summary of each entry in the 
manifest files. Each entry in the manifest file corresponds to a single data 
file, and ExpressionEvaluator allows us to reject individual data files from 
the scan based on their partition summary metadata within each manifest entry.
   
   The final filter step in the file plan stage uses the 
[InclusiveMetricsEvaluator](https://github.com/apache/iceberg-rust/blob/fcf14ca2373a13181d211e6f431f2f24f8bb6dd8/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs).
 This evaluates our filter predicate against the field-level metrics that are 
stored in the manifest entry. These contain a variety of statistics for each 
field in the data file: the number of nulls, NaNs, and values, and the lower 
and upper bounds for the field (i.e. lowest / highest value for numerical 
columns, or the alphabetically first and last values for string columns). This 
allows us to reject even more of the remaining data files before we've even 
scanned them.
   
   All of this is already in place prior to this PR. The general theme is to 
use metadata in order to reject or select chunks of data before we even get to 
the data itself. We don't need to stop this process at the data file level 
though - Parquet files are not monolithic, but chunked themselves, and contain 
two sources of metadata that we can use to further reject chunks of data from 
our scan before we even look at it.
   
   Parquet files split the data within into [Row 
Groups](https://parquet.apache.org/docs/file-format/). The data within each row 
group is columnar - i.e. each column is stored separately, one after the other, 
in "Column Chunks". The data for each column chunk in a row group can then be 
further subdivided into pages. The size of each page does not need to be the 
same, either within a column or across columns: i.e., column 1, page 1 does not 
necessarily have the same number of rows as either column 1, page 2 or column 
2, page 1.
   
   The footer of the Parquet file stores metadata about each row group. For 
each row group, there are some statistics that are maintained for every column 
chunk in that row group. These include the min and max value present in that 
column chunk and the number of nulls - similar to the data file metrics that 
get processed by the InclusiveMetricsEvaluator mentioned above.
   
   Optionally (but always for Iceberg Parquet files), the column chunk metadata 
can also contain a [Page 
Index](https://parquet.apache.org/docs/file-format/pageindex/). This extra 
metadata contains further statistics about each page within the column chunk: 
specifically, the min and max value within each page of the chunk, and how many 
null values are in that page. This final level of metadata can be used to 
further filter out rows from each row group that can't match the filter 
predicate.
   
   
   ## Row Group Filtering
   This PR adds the RowGroupMetrics evaluator, which applies an Iceberg filter 
predicate to a parquet file's row group metadata as outlined above to reject or 
select whole row groups within the data file from the scan. The 
[ArrowReader](https://github.com/apache/iceberg-rust/blob/fcf14ca2373a13181d211e6f431f2f24f8bb6dd8/crates/iceberg/src/arrow/reader.rs#L97)
 uses this within calls to `read`, passing the list of selected row group 
indices to 
[ParquetRecordBatchStreamBuilder::with_row_groups](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderBuilder.html#tymethod.with_row_groups).
   
   Since the row group metadata is always parsed anyway as part of the process 
of reading each parquet file, this filtering adds little overhead. However, it 
is only useful if your table's data files can have more than one row group in 
each data file. The default max row group size for Iceberg parquet writing is 
128Mb (configured using the `write.parquet.row-group-size-bytes` table setting, 
see here). If your data files are smaller than the row group size, then Row 
Group Filtering will provide no performance improvements. This PR makes Row 
Group Filtering enabled by default, but it can be turned off for a scan by 
calling `with_row_group_filtering_enabled(false)` when building the scan 
(There's a corresponding method on the ArrowReaderBuilder if you are using that 
directly). You may want to do this to eliminate the small but non-zero overhead 
of this filtering, or when experimenting to see the impact that this filtering 
can make.
   
   ## Row Selection Skipping
   Additionally, this PR adds the PageIndexEvaluator, which applies an iceberg 
filter predicate to a parquet Page Index, allowing the fetching into memory and 
decoding of entire slices of rows within a row group to be skipped if the 
PageIndexEvaluator determines from the Page Index that they don't match the 
filter. The PageIndexEvaluator builds a RowSelection that it passes to 
[ParquetRecordBatchStreamBuilder::with_row_selection](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderBuilder.html#method.with_row_selection).
   
   Unlike the row group metadata, the Page Index is not parsed by default when 
reading in the parquet file metadata. When present, especially for files with 
many columns, it can be quite large and intensive to parse. In fact, during 
experiments on the performance testing suite on [my other 
PR](https://github.com/apache/iceberg-rust/pull/497), it turns out that often 
the additional time required to parse the Page Index outweighs the time saved 
fetching, decoding, and filtering all of the rows within a row group. For this 
reason, I've kept Row Selection Skipping disabled by default. It can be enabled 
by calling `with_row_selection_enabled(true)` when building the scan (There's a 
corresponding method on the ArrowReaderBuilder if you are using that directly). 
If you're interested in this feature, you may want to experiment with the 
`write.parquet.page-size-bytes` and `write.parquet.page-row-limit` [table 
settings](https://iceberg.apache.org/docs/latest/configuration/#write-properties)
 (w
 hich default to 1Mb and 20,000 rows respectively) in order to get optimal 
performance, as well as the sort settings for your table.
   
   ## Future Directions
   The overhead of parsing the Page Index could be offset by caching a parsed 
Page Index for each data file. It would be interesting to see if this has 
enough of an effect to increase the utility of Row Selection Skipping. I will 
experiment with adding Page Index caching to my [Object Cache 
PR](https://github.com/apache/iceberg-rust/pull/512) to see how this goes.
   
   Additionally, the page index is an array, with one index per column. It 
seems like it would be possible to only store page index entries for some 
arbitrary left-aligned slice of columns - i.e., columns 0 to m for a file with 
n columns, where m < n. This could be taken advantage of, in conjunction with 
using a specific column order, to only store a page index for columns for which 
it would be useful to filter on, saving space and potentially reducing the 
parsing overhead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to