sdd opened a new pull request, #558: URL: https://github.com/apache/iceberg-rust/pull/558
This PR introduces two more advanced features that can improve performance when executing table reads on parquet-backed tables when using a filter predicate: "row group filtering" and "row selection skipping". ## Background When performing a predicate filtered table scan, the best performance is achieved by eliminating as many rows as possible without having to apply the filter predicate to each individual row. Applying a filter predicate to the data itself takes time, and there are techniques that we can employ in order to avoid this. These techniques are applied hierarchically, with the most coarse-grained applying first. Each step eliminates some work for the next step by excluding more rows from the result. At the top level, when we create the file plan for the scan, we use the [ManifestEvaluator](https://github.com/apache/iceberg-rust/blob/fcf14ca2373a13181d211e6f431f2f24f8bb6dd8/crates/iceberg/src/expr/visitors/manifest_evaluator.rs) to reject entire manifest files from the plan, based on the partition field summary metadata of each manifest file. Rejecting a manifest file allows us to filter out multiple, often many, data files, in a single step. Once we have our list of remaining selected manifest files, we load them in and perform the next layer of filtering. In this step, we use the [ExpressionEvaluator](https://github.com/apache/iceberg-rust/blob/fcf14ca2373a13181d211e6f431f2f24f8bb6dd8/crates/iceberg/src/expr/visitors/expression_evaluator.rs) to apply our filter predicate to the partition summary of each entry in the manifest files. Each entry in the manifest file corresponds to a single data file, and ExpressionEvaluator allows us to reject individual data files from the scan based on their partition summary metadata within each manifest entry. The final filter step in the file plan stage uses the [InclusiveMetricsEvaluator](https://github.com/apache/iceberg-rust/blob/fcf14ca2373a13181d211e6f431f2f24f8bb6dd8/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs). This evaluates our filter predicate against the field-level metrics that are stored in the manifest entry. These contain a variety of statistics for each field in the data file: the number of nulls, NaNs, and values, and the lower and upper bounds for the field (i.e. lowest / highest value for numerical columns, or the alphabetically first and last values for string columns). This allows us to reject even more of the remaining data files before we've even scanned them. All of this is already in place prior to this PR. The general theme is to use metadata in order to reject or select chunks of data before we even get to the data itself. We don't need to stop this process at the data file level though - Parquet files are not monolithic, but chunked themselves, and contain two sources of metadata that we can use to further reject chunks of data from our scan before we even look at it. Parquet files split the data within into [Row Groups](https://parquet.apache.org/docs/file-format/). The data within each row group is columnar - i.e. each column is stored separately, one after the other, in "Column Chunks". The data for each column chunk in a row group can then be further subdivided into pages. The size of each page does not need to be the same, either within a column or across columns: i.e., column 1, page 1 does not necessarily have the same number of rows as either column 1, page 2 or column 2, page 1. The footer of the Parquet file stores metadata about each row group. For each row group, there are some statistics that are maintained for every column chunk in that row group. These include the min and max value present in that column chunk and the number of nulls - similar to the data file metrics that get processed by the InclusiveMetricsEvaluator mentioned above. Optionally (but always for Iceberg Parquet files), the column chunk metadata can also contain a [Page Index](https://parquet.apache.org/docs/file-format/pageindex/). This extra metadata contains further statistics about each page within the column chunk: specifically, the min and max value within each page of the chunk, and how many null values are in that page. This final level of metadata can be used to further filter out rows from each row group that can't match the filter predicate. ## Row Group Filtering This PR adds the RowGroupMetrics evaluator, which applies an Iceberg filter predicate to a parquet file's row group metadata as outlined above to reject or select whole row groups within the data file from the scan. The [ArrowReader](https://github.com/apache/iceberg-rust/blob/fcf14ca2373a13181d211e6f431f2f24f8bb6dd8/crates/iceberg/src/arrow/reader.rs#L97) uses this within calls to `read`, passing the list of selected row group indices to [ParquetRecordBatchStreamBuilder::with_row_groups](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderBuilder.html#tymethod.with_row_groups). Since the row group metadata is always parsed anyway as part of the process of reading each parquet file, this filtering adds little overhead. However, it is only useful if your table's data files can have more than one row group in each data file. The default max row group size for Iceberg parquet writing is 128Mb (configured using the `write.parquet.row-group-size-bytes` table setting, see here). If your data files are smaller than the row group size, then Row Group Filtering will provide no performance improvements. This PR makes Row Group Filtering enabled by default, but it can be turned off for a scan by calling `with_row_group_filtering_enabled(false)` when building the scan (There's a corresponding method on the ArrowReaderBuilder if you are using that directly). You may want to do this to eliminate the small but non-zero overhead of this filtering, or when experimenting to see the impact that this filtering can make. ## Row Selection Skipping Additionally, this PR adds the PageIndexEvaluator, which applies an iceberg filter predicate to a parquet Page Index, allowing the fetching into memory and decoding of entire slices of rows within a row group to be skipped if the PageIndexEvaluator determines from the Page Index that they don't match the filter. The PageIndexEvaluator builds a RowSelection that it passes to [ParquetRecordBatchStreamBuilder::with_row_selection](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderBuilder.html#method.with_row_selection). Unlike the row group metadata, the Page Index is not parsed by default when reading in the parquet file metadata. When present, especially for files with many columns, it can be quite large and intensive to parse. In fact, during experiments on the performance testing suite on [my other PR](https://github.com/apache/iceberg-rust/pull/497), it turns out that often the additional time required to parse the Page Index outweighs the time saved fetching, decoding, and filtering all of the rows within a row group. For this reason, I've kept Row Selection Skipping disabled by default. It can be enabled by calling `with_row_selection_enabled(true)` when building the scan (There's a corresponding method on the ArrowReaderBuilder if you are using that directly). If you're interested in this feature, you may want to experiment with the `write.parquet.page-size-bytes` and `write.parquet.page-row-limit` [table settings](https://iceberg.apache.org/docs/latest/configuration/#write-properties) (w hich default to 1Mb and 20,000 rows respectively) in order to get optimal performance, as well as the sort settings for your table. ## Future Directions The overhead of parsing the Page Index could be offset by caching a parsed Page Index for each data file. It would be interesting to see if this has enough of an effect to increase the utility of Row Selection Skipping. I will experiment with adding Page Index caching to my [Object Cache PR](https://github.com/apache/iceberg-rust/pull/512) to see how this goes. Additionally, the page index is an array, with one index per column. It seems like it would be possible to only store page index entries for some arbitrary left-aligned slice of columns - i.e., columns 0 to m for a file with n columns, where m < n. This could be taken advantage of, in conjunction with using a specific column order, to only store a page index for columns for which it would be useful to filter on, saving space and potentially reducing the parsing overhead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org