This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 04f217b670 Speed up Parquet filter pushdown v4 (Predicate evaluation
cache for async_reader) (#7850)
04f217b670 is described below
commit 04f217b6708eed2804c3b0a669a65ea111c2c5f1
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Fri Aug 8 13:31:57 2025 -0500
Speed up Parquet filter pushdown v4 (Predicate evaluation cache for
async_reader) (#7850)
This is my latest attempt to make pushdown faster. Prior art: #6921
cc @alamb @zhuqi-lucas
- Part of https://github.com/apache/arrow-rs/issues/8000
- Related to https://github.com/apache/datafusion/issues/3463
- Related to https://github.com/apache/arrow-rs/issues/7456
- Closes https://github.com/apache/arrow-rs/issues/7363
- Closes https://github.com/apache/arrow-rs/pull/8003
## Problems of #6921
1. It proactively loads entire row group into memory. (rather than only
loading pages that passing the filter predicate)
2. It only cache decompressed pages, still paying the decoding cost
twice.
This PR takes a different approach, it does not change the decoding
pipeline, so we avoid the problem 1. It also caches the arrow record
batch, so avoid problem 2.
But this means we need to use more memory to cache data.
## How it works?
1. It instruments the `array_readers` with a transparent
`cached_array_reader`.
2. The cache layer will first consult the `RowGroupCache` to look for a
batch, and only reads from underlying reader on a cache miss.
3. There're cache producer and cache consumer. Producer is when we build
filters we insert arrow arrays into cache, consumer is when we build
outputs, we remove arrow array from cache. So the memory usage should
look like this:
```
▲
│ ╭─╮
│ ╱ ╲
│ ╱ ╲
│ ╱ ╲
│ ╱ ╲
│╱ ╲
└─────────────╲──────► Time
│ │ │
Filter Peak Consume
Phase (Built) (Decrease)
```
In a concurrent setup, not all reader may reach the peak point at the
same time, so the peak system memory usage might be lower.
4. It has a max_cache_size knob, this is a per row group setting. If the
row group has used up the budget, the cache stops taking new data. and
the `cached_array_reader` will fallback to read and decode from Parquet.
## Other benefits
1. This architecture allows nested columns (but not implemented in this
pr), i.e., it's future proof.
2. There're many performance optimizations to further squeeze the
performance, but even with current state, it has no regressions.
## How does it perform?
My criterion somehow won't produces a result from `--save-baseline`, so
I asked llm to generate a table from this benchmark:
```
cargo bench --bench arrow_reader_clickbench --features "arrow async" "async"
```
`Baseline` is the implementation for current main branch.
`New Unlimited` is the new pushdown with unlimited memory budget.
`New 100MB` is the new pushdown but the memory budget for a row group
caching is 100MB.
```
Query | Baseline (ms) | New Unlimited (ms) | Diff (ms) | New 100MB (ms) |
Diff (ms)
-------+--------------+--------------------+-----------+----------------+-----------
Q1 | 0.847 | 0.803 | -0.044 | 0.812
| -0.035
Q10 | 4.060 | 6.273 | +2.213 | 6.216
| +2.156
Q11 | 5.088 | 7.152 | +2.064 | 7.193
| +2.105
Q12 | 18.485 | 14.937 | -3.548 | 14.904
| -3.581
Q13 | 24.859 | 21.908 | -2.951 | 21.705
| -3.154
Q14 | 23.994 | 20.691 | -3.303 | 20.467
| -3.527
Q19 | 1.894 | 1.980 | +0.086 | 1.996
| +0.102
Q20 | 90.325 | 64.689 | -25.636 | 74.478
| -15.847
Q21 | 106.610 | 74.766 | -31.844 | 99.557
| -7.053
Q22 | 232.730 | 101.660 | -131.070 | 204.800
| -27.930
Q23 | 222.800 | 186.320 | -36.480 | 186.590
| -36.210
Q24 | 24.840 | 19.762 | -5.078 | 19.908
| -4.932
Q27 | 80.463 | 47.118 | -33.345 | 49.597
| -30.866
Q28 | 78.999 | 47.583 | -31.416 | 51.432
| -27.567
Q30 | 28.587 | 28.710 | +0.123 | 28.926
| +0.339
Q36 | 80.157 | 57.954 | -22.203 | 58.012
| -22.145
Q37 | 46.962 | 45.901 | -1.061 | 45.386
| -1.576
Q38 | 16.324 | 16.492 | +0.168 | 16.522
| +0.198
Q39 | 20.754 | 20.734 | -0.020 | 20.648
| -0.106
Q40 | 22.554 | 21.707 | -0.847 | 21.995
| -0.559
Q41 | 16.430 | 16.391 | -0.039 | 16.581
| +0.151
Q42 | 6.045 | 6.157 | +0.112 | 6.120
| +0.075
```
1. If we consider the diff within 5ms to be noise, then we are never
worse than the current implementation.
2. We see significant improvements for string-heavy queries, because
string columns are large, they take time to decompress and decode.
3. 100MB cache budget seems to have small performance impact.
## Limitations
1. It only works for async readers, because sync reader do not follow
the same row group by row group structure.
2. It is memory hungry -- compared to #6921. But changing decoding
pipeline without eager loading entire row group would require
significant changes to the current decoding infrastructure, e.g., we
need to make page iterator an async function.
3. It currently doesn't support nested columns, more specifically, it
doesn't support nested columns with nullable parents. but supporting it
is straightforward, no big changes.
4. The current memory accounting is not accurate, it will overestimate
the memory usage, especially when reading string view arrays, where
multiple string view may share the same underlying buffer, and that
buffer size is counted twice. Anyway, we never exceeds the user
configured memory usage.
5. If one row passes the filter, the entire batch will be cached. We can
probably optimize this though.
## Next steps?
This pr is largely proof of concept, I want to collect some feedback
before sending a multi-thousands pr :)
Some items I can think of:
1. Design an interface for user to specify the cache size limit,
currently it's hard-coded.
2. Don't instrument nested array reader if the parquet file has nullable
parent. currently it will panic
3. More testing, and integration test/benchmark with Datafusion
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/arrow/array_reader/builder.rs | 94 ++-
.../src/arrow/array_reader/cached_array_reader.rs | 762 +++++++++++++++++++++
parquet/src/arrow/array_reader/list_array.rs | 4 +-
parquet/src/arrow/array_reader/mod.rs | 5 +-
parquet/src/arrow/array_reader/row_group_cache.rs | 206 ++++++
parquet/src/arrow/arrow_reader/metrics.rs | 135 ++++
parquet/src/arrow/arrow_reader/mod.rs | 117 +++-
parquet/src/arrow/arrow_reader/selection.rs | 53 ++
parquet/src/arrow/async_reader/mod.rs | 234 ++++++-
parquet/src/arrow/mod.rs | 7 +
parquet/tests/arrow_reader/mod.rs | 2 +
parquet/tests/arrow_reader/predicate_cache.rs | 279 ++++++++
12 files changed, 1869 insertions(+), 29 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
index 6dcf05ccf8..d5e36fbcb4 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -15,18 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use arrow_schema::{DataType, Fields, SchemaBuilder};
use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
+use crate::arrow::array_reader::cached_array_reader::CacheRole;
+use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use
crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
+use crate::arrow::array_reader::row_group_cache::RowGroupCache;
use crate::arrow::array_reader::{
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
PrimitiveArrayReader, RowGroups, StructArrayReader,
};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::schema::{ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
use crate::basic::Type as PhysicalType;
@@ -34,14 +38,74 @@ use crate::data_type::{BoolType, DoubleType, FloatType,
Int32Type, Int64Type, In
use crate::errors::{ParquetError, Result};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
+/// Builder for [`CacheOptions`]
+#[derive(Debug, Clone)]
+pub struct CacheOptionsBuilder<'a> {
+ /// Projection mask to apply to the cache
+ pub projection_mask: &'a ProjectionMask,
+ /// Cache to use for storing row groups
+ pub cache: Arc<Mutex<RowGroupCache>>,
+}
+
+impl<'a> CacheOptionsBuilder<'a> {
+ /// create a new cache options builder
+ pub fn new(projection_mask: &'a ProjectionMask, cache:
Arc<Mutex<RowGroupCache>>) -> Self {
+ Self {
+ projection_mask,
+ cache,
+ }
+ }
+
+ /// Return a new [`CacheOptions`] for producing (populating) the cache
+ pub fn producer(self) -> CacheOptions<'a> {
+ CacheOptions {
+ projection_mask: self.projection_mask,
+ cache: self.cache,
+ role: CacheRole::Producer,
+ }
+ }
+
+ /// return a new [`CacheOptions`] for consuming (reading) the cache
+ pub fn consumer(self) -> CacheOptions<'a> {
+ CacheOptions {
+ projection_mask: self.projection_mask,
+ cache: self.cache,
+ role: CacheRole::Consumer,
+ }
+ }
+}
+
+/// Cache options containing projection mask, cache, and role
+#[derive(Clone)]
+pub struct CacheOptions<'a> {
+ pub projection_mask: &'a ProjectionMask,
+ pub cache: Arc<Mutex<RowGroupCache>>,
+ pub role: CacheRole,
+}
+
/// Builds [`ArrayReader`]s from parquet schema, projection mask, and
RowGroups reader
pub struct ArrayReaderBuilder<'a> {
+ /// Source of row group data
row_groups: &'a dyn RowGroups,
+ /// Optional cache options for the array reader
+ cache_options: Option<&'a CacheOptions<'a>>,
+ /// metrics
+ metrics: &'a ArrowReaderMetrics,
}
impl<'a> ArrayReaderBuilder<'a> {
- pub fn new(row_groups: &'a dyn RowGroups) -> Self {
- Self { row_groups }
+ pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics)
-> Self {
+ Self {
+ row_groups,
+ cache_options: None,
+ metrics,
+ }
+ }
+
+ /// Add cache options to the builder
+ pub fn with_cache_options(mut self, cache_options: Option<&'a
CacheOptions<'a>>) -> Self {
+ self.cache_options = cache_options;
+ self
}
/// Create [`ArrayReader`] from parquet schema, projection mask, and
parquet file reader.
@@ -69,7 +133,26 @@ impl<'a> ArrayReaderBuilder<'a> {
mask: &ProjectionMask,
) -> Result<Option<Box<dyn ArrayReader>>> {
match field.field_type {
- ParquetFieldType::Primitive { .. } =>
self.build_primitive_reader(field, mask),
+ ParquetFieldType::Primitive { col_idx, .. } => {
+ let Some(reader) = self.build_primitive_reader(field, mask)?
else {
+ return Ok(None);
+ };
+ let Some(cache_options) = self.cache_options.as_ref() else {
+ return Ok(Some(reader));
+ };
+
+ if cache_options.projection_mask.leaf_included(col_idx) {
+ Ok(Some(Box::new(CachedArrayReader::new(
+ reader,
+ Arc::clone(&cache_options.cache),
+ col_idx,
+ cache_options.role,
+ self.metrics.clone(), // cheap clone
+ ))))
+ } else {
+ Ok(Some(reader))
+ }
+ }
ParquetFieldType::Group { .. } => match &field.arrow_type {
DataType::Map(_, _) => self.build_map_reader(field, mask),
DataType::Struct(_) => self.build_struct_reader(field, mask),
@@ -375,7 +458,8 @@ mod tests {
)
.unwrap();
- let array_reader = ArrayReaderBuilder::new(&file_reader)
+ let metrics = ArrowReaderMetrics::disabled();
+ let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
.build_array_reader(fields.as_ref(), &mask)
.unwrap();
diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs
b/parquet/src/arrow/array_reader/cached_array_reader.rs
new file mode 100644
index 0000000000..0e837782fa
--- /dev/null
+++ b/parquet/src/arrow/array_reader/cached_array_reader.rs
@@ -0,0 +1,762 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`CachedArrayReader`] wrapper around [`ArrayReader`]
+
+use crate::arrow::array_reader::row_group_cache::BatchID;
+use crate::arrow::array_reader::{row_group_cache::RowGroupCache, ArrayReader};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::errors::Result;
+use arrow_array::{new_empty_array, ArrayRef, BooleanArray};
+use arrow_buffer::BooleanBufferBuilder;
+use arrow_schema::DataType as ArrowType;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+/// Role of the cached array reader
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum CacheRole {
+ /// Producer role: inserts data into the cache during filter phase
+ Producer,
+ /// Consumer role: removes consumed data from the cache during output
building phase
+ Consumer,
+}
+
+/// A cached wrapper around an ArrayReader that avoids duplicate decoding
+/// when the same column appears in both filter predicates and output
projection.
+///
+/// This reader acts as a transparent layer over the inner reader, using a
cache
+/// to avoid redundant work when the same data is needed multiple times.
+///
+/// The reader can operate in two roles:
+/// - Producer: During filter phase, inserts decoded data into the cache
+/// - Consumer: During output building, consumes and removes data from the
cache
+///
+/// This means the memory consumption of the cache has two stages:
+/// 1. During the filter phase, the memory increases as the cache is populated
+/// 2. It peaks when filters are built.
+/// 3. It decreases as the cached data is consumed.
+///
+/// ```text
+/// ▲
+/// │ ╭─╮
+/// │ ╱ ╲
+/// │ ╱ ╲
+/// │ ╱ ╲
+/// │ ╱ ╲
+/// │╱ ╲
+/// └─────────────╲──────► Time
+/// │ │ │
+/// Filter Peak Consume
+/// Phase (Built) (Decrease)
+/// ```
+pub struct CachedArrayReader {
+ /// The underlying array reader
+ inner: Box<dyn ArrayReader>,
+ /// Shared cache for this row group
+ shared_cache: Arc<Mutex<RowGroupCache>>,
+ /// Column index for cache key generation
+ column_idx: usize,
+ /// Current logical position in the data stream for this reader (for cache
key generation)
+ outer_position: usize,
+ /// Current position in `inner`
+ inner_position: usize,
+ /// Batch size for the cache
+ batch_size: usize,
+ /// Boolean buffer builder to track selections for the next consume_batch()
+ selections: BooleanBufferBuilder,
+ /// Role of this reader (Producer or Consumer)
+ role: CacheRole,
+ /// Local cache to store batches between read_records and consume_batch
calls
+ /// This ensures data is available even if the shared cache evicts items
+ local_cache: HashMap<BatchID, ArrayRef>,
+ /// Statistics to report on the Cache behavior
+ metrics: ArrowReaderMetrics,
+}
+
+impl CachedArrayReader {
+ /// Creates a new cached array reader with the specified role
+ pub fn new(
+ inner: Box<dyn ArrayReader>,
+ cache: Arc<Mutex<RowGroupCache>>,
+ column_idx: usize,
+ role: CacheRole,
+ metrics: ArrowReaderMetrics,
+ ) -> Self {
+ let batch_size = cache.lock().unwrap().batch_size();
+
+ Self {
+ inner,
+ shared_cache: cache,
+ column_idx,
+ outer_position: 0,
+ inner_position: 0,
+ batch_size,
+ selections: BooleanBufferBuilder::new(0),
+ role,
+ local_cache: HashMap::new(),
+ metrics,
+ }
+ }
+
+ fn get_batch_id_from_position(&self, row_id: usize) -> BatchID {
+ BatchID {
+ val: row_id / self.batch_size,
+ }
+ }
+
+ /// Loads the batch with the given ID (first row offset) from the inner
+ /// reader
+ ///
+ /// After this call the required batch will be available in
+ /// `self.local_cache` and may also be stored in `self.shared_cache`.
+ ///
+ fn fetch_batch(&mut self, batch_id: BatchID) -> Result<usize> {
+ let first_row_offset = batch_id.val * self.batch_size;
+ if self.inner_position < first_row_offset {
+ let to_skip = first_row_offset - self.inner_position;
+ let skipped = self.inner.skip_records(to_skip)?;
+ assert_eq!(skipped, to_skip);
+ self.inner_position += skipped;
+ }
+
+ let read = self.inner.read_records(self.batch_size)?;
+
+ // If there are no remaining records (EOF), return immediately without
+ // attempting to cache an empty batch. This prevents inserting
zero-length
+ // arrays into the cache which can later cause panics when slicing.
+ if read == 0 {
+ return Ok(0);
+ }
+
+ let array = self.inner.consume_batch()?;
+
+ // Store in both shared cache and local cache
+ // The shared cache is used to reuse results between readers
+ // The local cache ensures data is available for our consume_batch call
+ let _cached =
+ self.shared_cache
+ .lock()
+ .unwrap()
+ .insert(self.column_idx, batch_id, array.clone());
+ // Note: if the shared cache is full (_cached == false), we continue
without caching
+ // The local cache will still store the data for this reader's use
+
+ self.local_cache.insert(batch_id, array);
+
+ self.inner_position += read;
+ Ok(read)
+ }
+
+ /// Remove batches from cache that have been completely consumed
+ /// This is only called for Consumer role readers
+ fn cleanup_consumed_batches(&mut self) {
+ let current_batch_id =
self.get_batch_id_from_position(self.outer_position);
+
+ // Remove batches that are at least one batch behind the current
position
+ // This ensures we don't remove batches that might still be needed for
the current batch
+ // We can safely remove batch_id if current_batch_id > batch_id + 1
+ if current_batch_id.val > 1 {
+ let mut cache = self.shared_cache.lock().unwrap();
+ for batch_id_to_remove in 0..(current_batch_id.val - 1) {
+ cache.remove(
+ self.column_idx,
+ BatchID {
+ val: batch_id_to_remove,
+ },
+ );
+ }
+ }
+ }
+}
+
+impl ArrayReader for CachedArrayReader {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &ArrowType {
+ self.inner.get_data_type()
+ }
+
+ fn read_records(&mut self, num_records: usize) -> Result<usize> {
+ let mut read = 0;
+ while read < num_records {
+ let batch_id =
self.get_batch_id_from_position(self.outer_position);
+
+ // Check local cache first
+ let cached = if let Some(array) = self.local_cache.get(&batch_id) {
+ Some(array.clone())
+ } else {
+ // If not in local cache, i.e., we are consumer, check shared
cache
+ let cache_content = self
+ .shared_cache
+ .lock()
+ .unwrap()
+ .get(self.column_idx, batch_id);
+ if let Some(array) = cache_content.as_ref() {
+ // Store in local cache for later use in consume_batch
+ self.local_cache.insert(batch_id, array.clone());
+ }
+ cache_content
+ };
+
+ match cached {
+ Some(array) => {
+ let array_len = array.len();
+ if array_len + batch_id.val * self.batch_size >
self.outer_position {
+ // the cache batch has some records that we can select
+ let v = array_len + batch_id.val * self.batch_size -
self.outer_position;
+ let select_cnt = std::cmp::min(num_records - read, v);
+ read += select_cnt;
+ self.metrics.increment_cache_reads(select_cnt);
+ self.outer_position += select_cnt;
+ self.selections.append_n(select_cnt, true);
+ } else {
+ // this is last batch and we have used all records
from it
+ break;
+ }
+ }
+ None => {
+ let read_from_inner = self.fetch_batch(batch_id)?;
+ // Reached end-of-file, no more records to read
+ if read_from_inner == 0 {
+ break;
+ }
+ self.metrics.increment_inner_reads(read_from_inner);
+ let select_from_this_batch = std::cmp::min(
+ num_records - read,
+ self.inner_position - self.outer_position,
+ );
+ read += select_from_this_batch;
+ self.outer_position += select_from_this_batch;
+ self.selections.append_n(select_from_this_batch, true);
+ if read_from_inner < self.batch_size {
+ // this is last batch from inner reader
+ break;
+ }
+ }
+ }
+ }
+ Ok(read)
+ }
+
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ let mut skipped = 0;
+ while skipped < num_records {
+ let size = std::cmp::min(num_records - skipped, self.batch_size);
+ skipped += size;
+ self.selections.append_n(size, false);
+ self.outer_position += size;
+ }
+ Ok(num_records)
+ }
+
+ fn consume_batch(&mut self) -> Result<ArrayRef> {
+ let row_count = self.selections.len();
+ if row_count == 0 {
+ return Ok(new_empty_array(self.inner.get_data_type()));
+ }
+
+ let start_position = self.outer_position - row_count;
+
+ let selection_buffer = self.selections.finish();
+
+ let start_batch = start_position / self.batch_size;
+ let end_batch = (start_position + row_count - 1) / self.batch_size;
+
+ let mut selected_arrays = Vec::new();
+ for batch_id in start_batch..=end_batch {
+ let batch_start = batch_id * self.batch_size;
+ let batch_end = batch_start + self.batch_size - 1;
+ let batch_id = self.get_batch_id_from_position(batch_start);
+
+ // Calculate the overlap between the start_position and the batch
+ let overlap_start = start_position.max(batch_start);
+ let overlap_end = (start_position + row_count - 1).min(batch_end);
+
+ if overlap_start > overlap_end {
+ continue;
+ }
+
+ let selection_start = overlap_start - start_position;
+ let selection_length = overlap_end - overlap_start + 1;
+ let mask = selection_buffer.slice(selection_start,
selection_length);
+
+ if mask.count_set_bits() == 0 {
+ continue;
+ }
+
+ let mask_array = BooleanArray::from(mask);
+ // Read from local cache instead of shared cache to avoid cache
eviction issues
+ let cached = self
+ .local_cache
+ .get(&batch_id)
+ .expect("data must be already cached in the read_records call,
this is a bug");
+ let cached = cached.slice(overlap_start - batch_start,
selection_length);
+ let filtered = arrow_select::filter::filter(&cached, &mask_array)?;
+ selected_arrays.push(filtered);
+ }
+
+ self.selections = BooleanBufferBuilder::new(0);
+
+ // Only remove batches from local buffer that are completely behind
current position
+ // Keep the current batch and any future batches as they might still
be needed
+ let current_batch_id =
self.get_batch_id_from_position(self.outer_position);
+ self.local_cache
+ .retain(|batch_id, _| batch_id.val >= current_batch_id.val);
+
+ // For consumers, cleanup batches that have been completely consumed
+ // This reduces the memory usage of the shared cache
+ if self.role == CacheRole::Consumer {
+ self.cleanup_consumed_batches();
+ }
+
+ match selected_arrays.len() {
+ 0 => Ok(new_empty_array(self.inner.get_data_type())),
+ 1 => Ok(selected_arrays.into_iter().next().unwrap()),
+ _ => Ok(arrow_select::concat::concat(
+ &selected_arrays
+ .iter()
+ .map(|a| a.as_ref())
+ .collect::<Vec<_>>(),
+ )?),
+ }
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ None // we don't allow nullable parent for now.
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ None
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::arrow::array_reader::row_group_cache::RowGroupCache;
+ use crate::arrow::array_reader::ArrayReader;
+ use arrow_array::{ArrayRef, Int32Array};
+ use std::sync::{Arc, Mutex};
+
+ // Mock ArrayReader for testing
+ struct MockArrayReader {
+ data: Vec<i32>,
+ position: usize,
+ records_to_consume: usize,
+ data_type: ArrowType,
+ }
+
+ impl MockArrayReader {
+ fn new(data: Vec<i32>) -> Self {
+ Self {
+ data,
+ position: 0,
+ records_to_consume: 0,
+ data_type: ArrowType::Int32,
+ }
+ }
+ }
+
+ impl ArrayReader for MockArrayReader {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+ let remaining = self.data.len() - self.position;
+ let to_read = std::cmp::min(batch_size, remaining);
+ self.records_to_consume += to_read;
+ Ok(to_read)
+ }
+
+ fn consume_batch(&mut self) -> Result<ArrayRef> {
+ let start = self.position;
+ let end = start + self.records_to_consume;
+ let slice = &self.data[start..end];
+ self.position = end;
+ self.records_to_consume = 0;
+ Ok(Arc::new(Int32Array::from(slice.to_vec())))
+ }
+
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ let remaining = self.data.len() - self.position;
+ let to_skip = std::cmp::min(num_records, remaining);
+ self.position += to_skip;
+ Ok(to_skip)
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ None
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ None
+ }
+ }
+
+ #[test]
+ fn test_cached_reader_basic() {
+ let metrics = ArrowReaderMetrics::disabled();
+ let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5]);
+ let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX)));
// Batch size 3
+ let mut cached_reader = CachedArrayReader::new(
+ Box::new(mock_reader),
+ cache,
+ 0,
+ CacheRole::Producer,
+ metrics,
+ );
+
+ // Read 3 records
+ let records_read = cached_reader.read_records(3).unwrap();
+ assert_eq!(records_read, 3);
+
+ let array = cached_reader.consume_batch().unwrap();
+ assert_eq!(array.len(), 3);
+
+ let int32_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(int32_array.values(), &[1, 2, 3]);
+
+ // Read 3 more records
+ let records_read = cached_reader.read_records(3).unwrap();
+ assert_eq!(records_read, 2);
+ }
+
+ #[test]
+ fn test_read_skip_pattern() {
+ let metrics = ArrowReaderMetrics::disabled();
+ let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9,
10]);
+ let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX)));
// Batch size 5
+ let mut cached_reader = CachedArrayReader::new(
+ Box::new(mock_reader),
+ cache,
+ 0,
+ CacheRole::Consumer,
+ metrics,
+ );
+
+ let read1 = cached_reader.read_records(2).unwrap();
+ assert_eq!(read1, 2);
+
+ let array1 = cached_reader.consume_batch().unwrap();
+ assert_eq!(array1.len(), 2);
+ let int32_array =
array1.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(int32_array.values(), &[1, 2]);
+
+ let skipped = cached_reader.skip_records(2).unwrap();
+ assert_eq!(skipped, 2);
+
+ let read2 = cached_reader.read_records(1).unwrap();
+ assert_eq!(read2, 1);
+
+ // Consume it (should be the 5th element after skipping 3,4)
+ let array2 = cached_reader.consume_batch().unwrap();
+ assert_eq!(array2.len(), 1);
+ let int32_array =
array2.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(int32_array.values(), &[5]);
+ }
+
+ #[test]
+ fn test_multiple_reads_before_consume() {
+ let metrics = ArrowReaderMetrics::disabled();
+ let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6]);
+ let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX)));
// Batch size 3
+ let mut cached_reader = CachedArrayReader::new(
+ Box::new(mock_reader),
+ cache,
+ 0,
+ CacheRole::Consumer,
+ metrics,
+ );
+
+ // Multiple reads should accumulate
+ let read1 = cached_reader.read_records(2).unwrap();
+ assert_eq!(read1, 2);
+
+ let read2 = cached_reader.read_records(1).unwrap();
+ assert_eq!(read2, 1);
+
+ // Consume should return all accumulated records
+ let array = cached_reader.consume_batch().unwrap();
+ assert_eq!(array.len(), 3);
+ let int32_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(int32_array.values(), &[1, 2, 3]);
+ }
+
+ #[test]
+ fn test_eof_behavior() {
+ let metrics = ArrowReaderMetrics::disabled();
+ let mock_reader = MockArrayReader::new(vec![1, 2, 3]);
+ let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX)));
// Batch size 5
+ let mut cached_reader = CachedArrayReader::new(
+ Box::new(mock_reader),
+ cache,
+ 0,
+ CacheRole::Consumer,
+ metrics,
+ );
+
+ // Try to read more than available
+ let read1 = cached_reader.read_records(5).unwrap();
+ assert_eq!(read1, 3); // Should only get 3 records (all available)
+
+ let array1 = cached_reader.consume_batch().unwrap();
+ assert_eq!(array1.len(), 3);
+
+ // Further reads should return 0
+ let read2 = cached_reader.read_records(1).unwrap();
+ assert_eq!(read2, 0);
+
+ let array2 = cached_reader.consume_batch().unwrap();
+ assert_eq!(array2.len(), 0);
+ }
+
+ #[test]
+ fn test_cache_sharing() {
+ let metrics = ArrowReaderMetrics::disabled();
+ let cache = Arc::new(Mutex::new(RowGroupCache::new(5, usize::MAX)));
// Batch size 5
+
+ // First reader - populate cache
+ let mock_reader1 = MockArrayReader::new(vec![1, 2, 3, 4, 5]);
+ let mut cached_reader1 = CachedArrayReader::new(
+ Box::new(mock_reader1),
+ cache.clone(),
+ 0,
+ CacheRole::Producer,
+ metrics.clone(),
+ );
+
+ cached_reader1.read_records(3).unwrap();
+ let array1 = cached_reader1.consume_batch().unwrap();
+ assert_eq!(array1.len(), 3);
+
+ // Second reader with different column index should not interfere
+ let mock_reader2 = MockArrayReader::new(vec![10, 20, 30, 40, 50]);
+ let mut cached_reader2 = CachedArrayReader::new(
+ Box::new(mock_reader2),
+ cache.clone(),
+ 1,
+ CacheRole::Consumer,
+ metrics.clone(),
+ );
+
+ cached_reader2.read_records(2).unwrap();
+ let array2 = cached_reader2.consume_batch().unwrap();
+ assert_eq!(array2.len(), 2);
+
+ // Verify the second reader got its own data, not from cache
+ let int32_array =
array2.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(int32_array.values(), &[10, 20]);
+ }
+
+ #[test]
+ fn test_consumer_removes_batches() {
+ let metrics = ArrowReaderMetrics::disabled();
+ let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9,
10]);
+ let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX)));
// Batch size 3
+ let mut consumer_reader = CachedArrayReader::new(
+ Box::new(mock_reader),
+ cache.clone(),
+ 0,
+ CacheRole::Consumer,
+ metrics,
+ );
+
+ // Read first batch (positions 0-2, batch 0)
+ let read1 = consumer_reader.read_records(3).unwrap();
+ assert_eq!(read1, 3);
+ assert_eq!(consumer_reader.outer_position, 3);
+ // Check that batch 0 is in cache after read_records
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some());
+
+ let array1 = consumer_reader.consume_batch().unwrap();
+ assert_eq!(array1.len(), 3);
+
+ // After first consume_batch, batch 0 should still be in cache
+ // (current_batch_id = 3/3 = 1, cleanup only happens if
current_batch_id > 1)
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some());
+
+ // Read second batch (positions 3-5, batch 1)
+ let read2 = consumer_reader.read_records(3).unwrap();
+ assert_eq!(read2, 3);
+ assert_eq!(consumer_reader.outer_position, 6);
+ let array2 = consumer_reader.consume_batch().unwrap();
+ assert_eq!(array2.len(), 3);
+
+ // After second consume_batch, batch 0 should be removed
+ // (current_batch_id = 6/3 = 2, cleanup removes batches 0..(2-1) =
0..1, so removes batch 0)
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_none());
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 1 }).is_some());
+
+ // Read third batch (positions 6-8, batch 2)
+ let read3 = consumer_reader.read_records(3).unwrap();
+ assert_eq!(read3, 3);
+ assert_eq!(consumer_reader.outer_position, 9);
+ let array3 = consumer_reader.consume_batch().unwrap();
+ assert_eq!(array3.len(), 3);
+
+ // After third consume_batch, batches 0 and 1 should be removed
+ // (current_batch_id = 9/3 = 3, cleanup removes batches 0..(3-1) =
0..2, so removes batches 0 and 1)
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_none());
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 1 }).is_none());
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 2 }).is_some());
+ }
+
+ #[test]
+ fn test_producer_keeps_batches() {
+ let metrics = ArrowReaderMetrics::disabled();
+ let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9,
10]);
+ let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX)));
// Batch size 3
+ let mut producer_reader = CachedArrayReader::new(
+ Box::new(mock_reader),
+ cache.clone(),
+ 0,
+ CacheRole::Producer,
+ metrics,
+ );
+
+ // Read first batch (positions 0-2)
+ let read1 = producer_reader.read_records(3).unwrap();
+ assert_eq!(read1, 3);
+ let array1 = producer_reader.consume_batch().unwrap();
+ assert_eq!(array1.len(), 3);
+
+ // Verify batch 0 is in cache
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some());
+
+ // Read second batch (positions 3-5) - producer should NOT remove
batch 0
+ let read2 = producer_reader.read_records(3).unwrap();
+ assert_eq!(read2, 3);
+ let array2 = producer_reader.consume_batch().unwrap();
+ assert_eq!(array2.len(), 3);
+
+ // Verify both batch 0 and batch 1 are still present (no removal for
producer)
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some());
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 1 }).is_some());
+ }
+
+ #[test]
+ fn test_local_cache_protects_against_eviction() {
+ let metrics = ArrowReaderMetrics::disabled();
+ let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6]);
+ let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX)));
// Batch size 3
+ let mut cached_reader = CachedArrayReader::new(
+ Box::new(mock_reader),
+ cache.clone(),
+ 0,
+ CacheRole::Consumer,
+ metrics,
+ );
+
+ // Read records which should populate both shared and local cache
+ let records_read = cached_reader.read_records(3).unwrap();
+ assert_eq!(records_read, 3);
+
+ // Verify data is in both caches
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_some());
+ assert!(cached_reader.local_cache.contains_key(&BatchID { val: 0 }));
+
+ // Simulate cache eviction by manually removing from shared cache
+ cache.lock().unwrap().remove(0, BatchID { val: 0 });
+ assert!(cache.lock().unwrap().get(0, BatchID { val: 0 }).is_none());
+
+ // Even though shared cache was evicted, consume_batch should still
work
+ // because data is preserved in local cache
+ let array = cached_reader.consume_batch().unwrap();
+ assert_eq!(array.len(), 3);
+
+ let int32_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(int32_array.values(), &[1, 2, 3]);
+
+ // Local cache should be cleared after consume_batch
+ assert!(cached_reader.local_cache.is_empty());
+ }
+
+ #[test]
+ fn test_local_cache_is_cleared_properly() {
+ let metrics = ArrowReaderMetrics::disabled();
+ let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4]);
+ let cache = Arc::new(Mutex::new(RowGroupCache::new(3, 0))); // Batch
size 3, cache 0
+ let mut cached_reader = CachedArrayReader::new(
+ Box::new(mock_reader),
+ cache.clone(),
+ 0,
+ CacheRole::Consumer,
+ metrics,
+ );
+
+ // Read records which should populate both shared and local cache
+ let records_read = cached_reader.read_records(1).unwrap();
+ assert_eq!(records_read, 1);
+ let array = cached_reader.consume_batch().unwrap();
+ assert_eq!(array.len(), 1);
+
+ let records_read = cached_reader.read_records(3).unwrap();
+ assert_eq!(records_read, 3);
+ let array = cached_reader.consume_batch().unwrap();
+ assert_eq!(array.len(), 3);
+ }
+
+ #[test]
+ fn test_batch_id_calculation_with_incremental_reads() {
+ let metrics = ArrowReaderMetrics::disabled();
+ let mock_reader = MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8,
9]);
+ let cache = Arc::new(Mutex::new(RowGroupCache::new(3, usize::MAX)));
// Batch size 3
+
+ // Create a producer to populate cache
+ let mut producer = CachedArrayReader::new(
+ Box::new(MockArrayReader::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])),
+ cache.clone(),
+ 0,
+ CacheRole::Producer,
+ metrics.clone(),
+ );
+
+ // Populate cache with first batch (1, 2, 3)
+ producer.read_records(3).unwrap();
+ producer.consume_batch().unwrap();
+
+ // Now create a consumer that will try to read from cache
+ let mut consumer = CachedArrayReader::new(
+ Box::new(mock_reader),
+ cache.clone(),
+ 0,
+ CacheRole::Consumer,
+ metrics,
+ );
+
+ // - We want to read 4 records starting from position 0
+ // - First 3 records (positions 0-2) should come from cache (batch 0)
+ // - The 4th record (position 3) should come from the next batch
+ let records_read = consumer.read_records(4).unwrap();
+ assert_eq!(records_read, 4);
+
+ let array = consumer.consume_batch().unwrap();
+ assert_eq!(array.len(), 4);
+
+ let int32_array = array.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(int32_array.values(), &[1, 2, 3, 4]);
+ }
+}
diff --git a/parquet/src/arrow/array_reader/list_array.rs
b/parquet/src/arrow/array_reader/list_array.rs
index 66c4f30b3c..e28c93cf62 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -249,6 +249,7 @@ mod tests {
use crate::arrow::array_reader::list_array::ListArrayReader;
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
use crate::arrow::array_reader::ArrayReaderBuilder;
+ use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask};
use crate::file::properties::WriterProperties;
@@ -563,7 +564,8 @@ mod tests {
)
.unwrap();
- let mut array_reader = ArrayReaderBuilder::new(&file_reader)
+ let metrics = ArrowReaderMetrics::disabled();
+ let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
.build_array_reader(fields.as_ref(), &mask)
.unwrap();
diff --git a/parquet/src/arrow/array_reader/mod.rs
b/parquet/src/arrow/array_reader/mod.rs
index d6e325b494..5b0ccd874f 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -33,6 +33,7 @@ mod builder;
mod byte_array;
mod byte_array_dictionary;
mod byte_view_array;
+mod cached_array_reader;
mod empty_array;
mod fixed_len_byte_array;
mod fixed_size_list_array;
@@ -40,13 +41,14 @@ mod list_array;
mod map_array;
mod null_array;
mod primitive_array;
+mod row_group_cache;
mod struct_array;
#[cfg(test)]
mod test_util;
// Note that this crate is public under the `experimental` feature flag.
-pub use builder::ArrayReaderBuilder;
+pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
#[allow(unused_imports)] // Only used for benchmarks
@@ -58,6 +60,7 @@ pub use list_array::ListArrayReader;
pub use map_array::MapArrayReader;
pub use null_array::NullArrayReader;
pub use primitive_array::PrimitiveArrayReader;
+pub use row_group_cache::RowGroupCache;
pub use struct_array::StructArrayReader;
/// Reads Parquet data into Arrow Arrays.
diff --git a/parquet/src/arrow/array_reader/row_group_cache.rs
b/parquet/src/arrow/array_reader/row_group_cache.rs
new file mode 100644
index 0000000000..ef726e1649
--- /dev/null
+++ b/parquet/src/arrow/array_reader/row_group_cache.rs
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::{Array, ArrayRef};
+use arrow_schema::DataType;
+use std::collections::HashMap;
+
+/// Starting row ID for this batch
+///
+/// The `BatchID` is used to identify batches of rows within a row group.
+///
+/// The row_index in the id are relative to the rows being read from the
+/// underlying column reader (which might already have a RowSelection applied)
+///
+/// The `BatchID` for any particular row is `row_index / batch_size`. The
+/// integer division ensures that rows in the same batch share the same
+/// the BatchID which can be calculated quickly from the row index
+#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
+pub struct BatchID {
+ pub val: usize,
+}
+
+/// Cache key that uniquely identifies a batch within a row group
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct CacheKey {
+ /// Column index in the row group
+ pub column_idx: usize,
+ /// Starting row ID for this batch
+ pub batch_id: BatchID,
+}
+
+fn get_array_memory_size_for_cache(array: &ArrayRef) -> usize {
+ match array.data_type() {
+ // TODO: this is temporary workaround. It's very difficult to measure
the actual memory usage of one StringViewArray,
+ // because the underlying buffer is shared with multiple
StringViewArrays.
+ DataType::Utf8View => {
+ use arrow_array::cast::AsArray;
+ let array = array.as_string_view();
+ array.len() * 16 + array.total_buffer_bytes_used() +
std::mem::size_of_val(array)
+ }
+ _ => array.get_array_memory_size(),
+ }
+}
+
+/// Row group cache that stores decoded arrow arrays at batch granularity
+///
+/// This cache is designed to avoid duplicate decoding when the same column
+/// appears in both filter predicates and output projection.
+#[derive(Debug)]
+pub struct RowGroupCache {
+ /// Cache storage mapping (column_idx, row_id) -> ArrayRef
+ cache: HashMap<CacheKey, ArrayRef>,
+ /// Cache granularity
+ batch_size: usize,
+ /// Maximum cache size in bytes
+ max_cache_bytes: usize,
+ /// Current cache size in bytes
+ current_cache_size: usize,
+}
+
+impl RowGroupCache {
+ /// Creates a new empty row group cache
+ pub fn new(batch_size: usize, max_cache_bytes: usize) -> Self {
+ Self {
+ cache: HashMap::new(),
+ batch_size,
+ max_cache_bytes,
+ current_cache_size: 0,
+ }
+ }
+
+ /// Inserts an array into the cache for the given column and starting row
ID
+ /// Returns true if the array was inserted, false if it would exceed the
cache size limit
+ pub fn insert(&mut self, column_idx: usize, batch_id: BatchID, array:
ArrayRef) -> bool {
+ let array_size = get_array_memory_size_for_cache(&array);
+
+ // Check if adding this array would exceed the cache size limit
+ if self.current_cache_size + array_size > self.max_cache_bytes {
+ return false; // Cache is full, don't insert
+ }
+
+ let key = CacheKey {
+ column_idx,
+ batch_id,
+ };
+
+ let existing = self.cache.insert(key, array);
+ assert!(existing.is_none());
+ self.current_cache_size += array_size;
+ true
+ }
+
+ /// Retrieves a cached array for the given column and row ID
+ /// Returns None if not found in cache
+ pub fn get(&self, column_idx: usize, batch_id: BatchID) ->
Option<ArrayRef> {
+ let key = CacheKey {
+ column_idx,
+ batch_id,
+ };
+ self.cache.get(&key).cloned()
+ }
+
+ /// Gets the batch size for this cache
+ pub fn batch_size(&self) -> usize {
+ self.batch_size
+ }
+
+ /// Removes a cached array for the given column and row ID
+ /// Returns true if the entry was found and removed, false otherwise
+ pub fn remove(&mut self, column_idx: usize, batch_id: BatchID) -> bool {
+ let key = CacheKey {
+ column_idx,
+ batch_id,
+ };
+ if let Some(array) = self.cache.remove(&key) {
+ self.current_cache_size -= get_array_memory_size_for_cache(&array);
+ true
+ } else {
+ false
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::{ArrayRef, Int32Array};
+ use std::sync::Arc;
+
+ #[test]
+ fn test_cache_basic_operations() {
+ let mut cache = RowGroupCache::new(1000, usize::MAX);
+
+ // Create test array
+ let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+
+ // Test insert and get
+ let batch_id = BatchID { val: 0 };
+ assert!(cache.insert(0, batch_id, array.clone()));
+ let retrieved = cache.get(0, batch_id);
+ assert!(retrieved.is_some());
+ assert_eq!(retrieved.unwrap().len(), 5);
+
+ // Test miss
+ let miss = cache.get(1, batch_id);
+ assert!(miss.is_none());
+
+ // Test different row_id
+ let miss = cache.get(0, BatchID { val: 1000 });
+ assert!(miss.is_none());
+ }
+
+ #[test]
+ fn test_cache_remove() {
+ let mut cache = RowGroupCache::new(1000, usize::MAX);
+
+ // Create test arrays
+ let array1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
+
+ // Insert arrays
+ assert!(cache.insert(0, BatchID { val: 0 }, array1.clone()));
+ assert!(cache.insert(0, BatchID { val: 1000 }, array2.clone()));
+ assert!(cache.insert(1, BatchID { val: 0 }, array1.clone()));
+
+ // Verify they're there
+ assert!(cache.get(0, BatchID { val: 0 }).is_some());
+ assert!(cache.get(0, BatchID { val: 1000 }).is_some());
+ assert!(cache.get(1, BatchID { val: 0 }).is_some());
+
+ // Remove one entry
+ let removed = cache.remove(0, BatchID { val: 0 });
+ assert!(removed);
+ assert!(cache.get(0, BatchID { val: 0 }).is_none());
+
+ // Other entries should still be there
+ assert!(cache.get(0, BatchID { val: 1000 }).is_some());
+ assert!(cache.get(1, BatchID { val: 0 }).is_some());
+
+ // Try to remove non-existent entry
+ let not_removed = cache.remove(0, BatchID { val: 0 });
+ assert!(!not_removed);
+
+ // Remove remaining entries
+ assert!(cache.remove(0, BatchID { val: 1000 }));
+ assert!(cache.remove(1, BatchID { val: 0 }));
+
+ // Cache should be empty
+ assert!(cache.get(0, BatchID { val: 1000 }).is_none());
+ assert!(cache.get(1, BatchID { val: 0 }).is_none());
+ }
+}
diff --git a/parquet/src/arrow/arrow_reader/metrics.rs
b/parquet/src/arrow/arrow_reader/metrics.rs
new file mode 100644
index 0000000000..05c7a51801
--- /dev/null
+++ b/parquet/src/arrow/arrow_reader/metrics.rs
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [ArrowReaderMetrics] for collecting metrics about the Arrow reader
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+/// This enum represents the state of Arrow reader metrics collection.
+///
+/// The inner metrics are stored in an `Arc<ArrowReaderMetricsInner>`
+/// so cloning the `ArrowReaderMetrics` enum will not clone the inner metrics.
+///
+/// To access metrics, create an `ArrowReaderMetrics` via
[`ArrowReaderMetrics::enabled()`]
+/// and configure the `ArrowReaderBuilder` with a clone.
+#[derive(Debug, Clone)]
+pub enum ArrowReaderMetrics {
+ /// Metrics are not collected (default)
+ Disabled,
+ /// Metrics are collected and stored in an `Arc`.
+ ///
+ /// Create this via [`ArrowReaderMetrics::enabled()`].
+ Enabled(Arc<ArrowReaderMetricsInner>),
+}
+
+impl ArrowReaderMetrics {
+ /// Creates a new instance of [`ArrowReaderMetrics::Disabled`]
+ pub fn disabled() -> Self {
+ Self::Disabled
+ }
+
+ /// Creates a new instance of [`ArrowReaderMetrics::Enabled`]
+ pub fn enabled() -> Self {
+ Self::Enabled(Arc::new(ArrowReaderMetricsInner::new()))
+ }
+
+ /// Predicate Cache: number of records read directly from the inner reader
+ ///
+ /// This is the total number of records read from the inner reader (that is
+ /// actually decoding). It measures the amount of work that could not be
+ /// avoided with caching.
+ ///
+ /// It returns the number of records read across all columns, so if you
read
+ /// 2 columns each with 100 records, this will return 200.
+ ///
+ ///
+ /// Returns None if metrics are disabled.
+ pub fn records_read_from_inner(&self) -> Option<usize> {
+ match self {
+ Self::Disabled => None,
+ Self::Enabled(inner) => Some(
+ inner
+ .records_read_from_inner
+ .load(std::sync::atomic::Ordering::Relaxed),
+ ),
+ }
+ }
+
+ /// Predicate Cache: number of records read from the cache
+ ///
+ /// This is the total number of records read from the cache actually
+ /// decoding). It measures the amount of work that was avoided with
caching.
+ ///
+ /// It returns the number of records read across all columns, so if you
read
+ /// 2 columns each with 100 records from the cache, this will return 200.
+ ///
+ /// Returns None if metrics are disabled.
+ pub fn records_read_from_cache(&self) -> Option<usize> {
+ match self {
+ Self::Disabled => None,
+ Self::Enabled(inner) => Some(
+ inner
+ .records_read_from_cache
+ .load(std::sync::atomic::Ordering::Relaxed),
+ ),
+ }
+ }
+
+ /// Increments the count of records read from the inner reader
+ pub(crate) fn increment_inner_reads(&self, count: usize) {
+ let Self::Enabled(inner) = self else {
+ return;
+ };
+ inner
+ .records_read_from_inner
+ .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
+ }
+
+ /// Increments the count of records read from the cache
+ pub(crate) fn increment_cache_reads(&self, count: usize) {
+ let Self::Enabled(inner) = self else {
+ return;
+ };
+
+ inner
+ .records_read_from_cache
+ .fetch_add(count, std::sync::atomic::Ordering::Relaxed);
+ }
+}
+
+/// Holds the actual metrics for the Arrow reader.
+///
+/// Please see [`ArrowReaderMetrics`] for the public interface.
+#[derive(Debug)]
+pub struct ArrowReaderMetricsInner {
+ // Metrics for Predicate Cache
+ /// Total number of records read from the inner reader (uncached)
+ records_read_from_inner: AtomicUsize,
+ /// Total number of records read from previously cached pages
+ records_read_from_cache: AtomicUsize,
+}
+
+impl ArrowReaderMetricsInner {
+ /// Creates a new instance of `ArrowReaderMetricsInner`
+ pub(crate) fn new() -> Self {
+ Self {
+ records_read_from_inner: AtomicUsize::new(0),
+ records_read_from_cache: AtomicUsize::new(0),
+ }
+ }
+}
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index d4a3e11e2c..3d20fa0a22 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -42,9 +42,11 @@ use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression,
BloomFilterHash};
use crate::schema::types::SchemaDescriptor;
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
mod filter;
+pub mod metrics;
mod read_plan;
mod selection;
pub mod statistics;
@@ -116,6 +118,10 @@ pub struct ArrowReaderBuilder<T> {
pub(crate) limit: Option<usize>,
pub(crate) offset: Option<usize>,
+
+ pub(crate) metrics: ArrowReaderMetrics,
+
+ pub(crate) max_predicate_cache_size: usize,
}
impl<T: Debug> Debug for ArrowReaderBuilder<T> {
@@ -132,6 +138,7 @@ impl<T: Debug> Debug for ArrowReaderBuilder<T> {
.field("selection", &self.selection)
.field("limit", &self.limit)
.field("offset", &self.offset)
+ .field("metrics", &self.metrics)
.finish()
}
}
@@ -150,6 +157,8 @@ impl<T> ArrowReaderBuilder<T> {
selection: None,
limit: None,
offset: None,
+ metrics: ArrowReaderMetrics::Disabled,
+ max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default
cache size
}
}
@@ -300,6 +309,65 @@ impl<T> ArrowReaderBuilder<T> {
..self
}
}
+
+ /// Specify metrics collection during reading
+ ///
+ /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a
+ /// clone of the provided metrics to the builder.
+ ///
+ /// For example:
+ ///
+ /// ```rust
+ /// # use std::sync::Arc;
+ /// # use bytes::Bytes;
+ /// # use arrow_array::{Int32Array, RecordBatch};
+ /// # use arrow_schema::{DataType, Field, Schema};
+ /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder};
+ /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+ /// # use parquet::arrow::ArrowWriter;
+ /// # let mut file: Vec<u8> = Vec::with_capacity(1024);
+ /// # let schema = Arc::new(Schema::new(vec![Field::new("i32",
DataType::Int32, false)]));
+ /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(),
None).unwrap();
+ /// # let batch = RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
+ /// # writer.write(&batch).unwrap();
+ /// # writer.close().unwrap();
+ /// # let file = Bytes::from(file);
+ /// // Create metrics object to pass into the reader
+ /// let metrics = ArrowReaderMetrics::enabled();
+ /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
+ /// // Configure the builder to use the metrics by passing a clone
+ /// .with_metrics(metrics.clone())
+ /// // Build the reader
+ /// .build().unwrap();
+ /// // .. read data from the reader ..
+ ///
+ /// // check the metrics
+ /// assert!(metrics.records_read_from_inner().is_some());
+ /// ```
+ pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
+ Self { metrics, ..self }
+ }
+
+ /// Set the maximum size (per row group) of the predicate cache in bytes
for
+ /// the async decoder.
+ ///
+ /// Defaults to 100MB (across all columns). Set to `usize::MAX` to use
+ /// unlimited cache size.
+ ///
+ /// This cache is used to store decoded arrays that are used in
+ /// predicate evaluation ([`Self::with_row_filter`]).
+ ///
+ /// This cache is only used for the "async" decoder,
[`ParquetRecordBatchStream`]. See
+ /// [this ticket] for more details and alternatives.
+ ///
+ /// [`ParquetRecordBatchStream`]:
https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html
+ /// [this ticket]: https://github.com/apache/arrow-rs/issues/8000
+ pub fn with_max_predicate_cache_size(self, max_predicate_cache_size:
usize) -> Self {
+ Self {
+ max_predicate_cache_size,
+ ..self
+ }
+ }
}
/// Options that control how metadata is read for a parquet file
@@ -771,23 +839,37 @@ impl<T: ChunkReader + 'static>
ParquetRecordBatchReaderBuilder<T> {
///
/// Note: this will eagerly evaluate any `RowFilter` before returning
pub fn build(self) -> Result<ParquetRecordBatchReader> {
+ let Self {
+ input,
+ metadata,
+ schema: _,
+ fields,
+ batch_size: _,
+ row_groups,
+ projection,
+ mut filter,
+ selection,
+ limit,
+ offset,
+ metrics,
+ // Not used for the sync reader, see
https://github.com/apache/arrow-rs/issues/8000
+ max_predicate_cache_size: _,
+ } = self;
+
// Try to avoid allocate large buffer
let batch_size = self
.batch_size
- .min(self.metadata.file_metadata().num_rows() as usize);
+ .min(metadata.file_metadata().num_rows() as usize);
- let row_groups = self
- .row_groups
- .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
+ let row_groups = row_groups.unwrap_or_else(||
(0..metadata.num_row_groups()).collect());
let reader = ReaderRowGroups {
- reader: Arc::new(self.input.0),
- metadata: self.metadata,
+ reader: Arc::new(input.0),
+ metadata,
row_groups,
};
- let mut filter = self.filter;
- let mut plan_builder =
ReadPlanBuilder::new(batch_size).with_selection(self.selection);
+ let mut plan_builder =
ReadPlanBuilder::new(batch_size).with_selection(selection);
// Update selection based on any filters
if let Some(filter) = filter.as_mut() {
@@ -797,20 +879,23 @@ impl<T: ChunkReader + 'static>
ParquetRecordBatchReaderBuilder<T> {
break;
}
- let array_reader = ArrayReaderBuilder::new(&reader)
- .build_array_reader(self.fields.as_deref(),
predicate.projection())?;
+ let mut cache_projection = predicate.projection().clone();
+ cache_projection.intersect(&projection);
+
+ let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
+ .build_array_reader(fields.as_deref(),
predicate.projection())?;
plan_builder = plan_builder.with_predicate(array_reader,
predicate.as_mut())?;
}
}
- let array_reader = ArrayReaderBuilder::new(&reader)
- .build_array_reader(self.fields.as_deref(), &self.projection)?;
+ let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
+ .build_array_reader(fields.as_deref(), &projection)?;
let read_plan = plan_builder
.limited(reader.num_rows())
- .with_offset(self.offset)
- .with_limit(self.limit)
+ .with_offset(offset)
+ .with_limit(limit)
.build_limited()
.build();
@@ -1005,7 +1090,9 @@ impl ParquetRecordBatchReader {
batch_size: usize,
selection: Option<RowSelection>,
) -> Result<Self> {
- let array_reader = ArrayReaderBuilder::new(row_groups)
+ // note metrics are not supported in this API
+ let metrics = ArrowReaderMetrics::disabled();
+ let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
.build_array_reader(levels.levels.as_ref(),
&ProjectionMask::all())?;
let read_plan = ReadPlanBuilder::new(batch_size)
diff --git a/parquet/src/arrow/arrow_reader/selection.rs
b/parquet/src/arrow/arrow_reader/selection.rs
index c53d47be2e..229eae4c5b 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -441,6 +441,59 @@ impl RowSelection {
pub fn skipped_row_count(&self) -> usize {
self.iter().filter(|s| s.skip).map(|s| s.row_count).sum()
}
+
+ /// Expands the selection to align with batch boundaries.
+ /// This is needed when using cached array readers to ensure that
+ /// the cached data covers full batches.
+ #[cfg(feature = "async")]
+ pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize,
total_rows: usize) -> Self {
+ if batch_size == 0 {
+ return self.clone();
+ }
+
+ let mut expanded_ranges = Vec::new();
+ let mut row_offset = 0;
+
+ for selector in &self.selectors {
+ if selector.skip {
+ row_offset += selector.row_count;
+ } else {
+ let start = row_offset;
+ let end = row_offset + selector.row_count;
+
+ // Expand start to batch boundary
+ let expanded_start = (start / batch_size) * batch_size;
+ // Expand end to batch boundary
+ let expanded_end = end.div_ceil(batch_size) * batch_size;
+ let expanded_end = expanded_end.min(total_rows);
+
+ expanded_ranges.push(expanded_start..expanded_end);
+ row_offset += selector.row_count;
+ }
+ }
+
+ // Sort ranges by start position
+ expanded_ranges.sort_by_key(|range| range.start);
+
+ // Merge overlapping or consecutive ranges
+ let mut merged_ranges: Vec<Range<usize>> = Vec::new();
+ for range in expanded_ranges {
+ if let Some(last) = merged_ranges.last_mut() {
+ if range.start <= last.end {
+ // Overlapping or consecutive - merge them
+ last.end = last.end.max(range.end);
+ } else {
+ // No overlap - add new range
+ merged_ranges.push(range);
+ }
+ } else {
+ // First range
+ merged_ranges.push(range);
+ }
+ }
+
+ Self::from_consecutive_ranges(merged_ranges.into_iter(), total_rows)
+ }
}
impl From<Vec<RowSelector>> for RowSelection {
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 611d6999e0..eea6176b76 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -26,7 +26,7 @@ use std::fmt::Formatter;
use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use bytes::{Buf, Bytes};
@@ -38,7 +38,9 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek,
AsyncSeekExt};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Fields, Schema, SchemaRef};
-use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroups};
+use crate::arrow::array_reader::{
+ ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
+};
use crate::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
ParquetRecordBatchReader,
RowFilter, RowSelection,
@@ -61,6 +63,7 @@ pub use metadata::*;
#[cfg(feature = "object_store")]
mod store;
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::arrow_reader::ReadPlanBuilder;
use crate::arrow::schema::ParquetField;
#[cfg(feature = "object_store")]
@@ -510,6 +513,8 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
fields: self.fields,
limit: self.limit,
offset: self.offset,
+ metrics: self.metrics,
+ max_predicate_cache_size: self.max_predicate_cache_size,
};
// Ensure schema of ParquetRecordBatchStream respects projection, and
does
@@ -560,6 +565,12 @@ struct ReaderFactory<T> {
/// Offset to apply to the next
offset: Option<usize>,
+
+ /// Metrics
+ metrics: ArrowReaderMetrics,
+
+ /// Maximum size of the predicate cache
+ max_predicate_cache_size: usize,
}
impl<T> ReaderFactory<T>
@@ -588,6 +599,16 @@ where
.filter(|index| !index.is_empty())
.map(|x| x[row_group_idx].as_slice());
+ // Reuse columns that are selected and used by the filters
+ let cache_projection = match
self.compute_cache_projection(&projection) {
+ Some(projection) => projection,
+ None => ProjectionMask::none(meta.columns().len()),
+ };
+ let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
+ batch_size,
+ self.max_predicate_cache_size,
+ )));
+
let mut row_group = InMemoryRowGroup {
// schema: meta.schema_descr_ptr(),
row_count: meta.num_rows() as usize,
@@ -597,11 +618,16 @@ where
metadata: self.metadata.as_ref(),
};
+ let cache_options_builder =
+ CacheOptionsBuilder::new(&cache_projection,
row_group_cache.clone());
+
let filter = self.filter.as_mut();
let mut plan_builder =
ReadPlanBuilder::new(batch_size).with_selection(selection);
// Update selection based on any filters
if let Some(filter) = filter {
+ let cache_options = cache_options_builder.clone().producer();
+
for predicate in filter.predicates.iter_mut() {
if !plan_builder.selects_any() {
return Ok((self, None)); // ruled out entire row group
@@ -609,11 +635,20 @@ where
// (pre) Fetch only the columns that are selected by the
predicate
let selection = plan_builder.selection();
+ // Fetch predicate columns; expand selection only for cached
predicate columns
+ let cache_mask = Some(&cache_projection);
row_group
- .fetch(&mut self.input, predicate.projection(), selection)
+ .fetch(
+ &mut self.input,
+ predicate.projection(),
+ selection,
+ batch_size,
+ cache_mask,
+ )
.await?;
- let array_reader = ArrayReaderBuilder::new(&row_group)
+ let array_reader = ArrayReaderBuilder::new(&row_group,
&self.metrics)
+ .with_cache_options(Some(&cache_options))
.build_array_reader(self.fields.as_deref(),
predicate.projection())?;
plan_builder = plan_builder.with_predicate(array_reader,
predicate.as_mut())?;
@@ -656,18 +691,69 @@ where
}
// fetch the pages needed for decoding
row_group
- .fetch(&mut self.input, &projection, plan_builder.selection())
+ // Final projection fetch shouldn't expand selection for cache;
pass None
+ .fetch(
+ &mut self.input,
+ &projection,
+ plan_builder.selection(),
+ batch_size,
+ None,
+ )
.await?;
let plan = plan_builder.build();
- let array_reader = ArrayReaderBuilder::new(&row_group)
+ let cache_options = cache_options_builder.consumer();
+ let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
+ .with_cache_options(Some(&cache_options))
.build_array_reader(self.fields.as_deref(), &projection)?;
let reader = ParquetRecordBatchReader::new(array_reader, plan);
Ok((self, Some(reader)))
}
+
+ /// Compute which columns are used in filters and the final (output)
projection
+ fn compute_cache_projection(&self, projection: &ProjectionMask) ->
Option<ProjectionMask> {
+ let filters = self.filter.as_ref()?;
+ let mut cache_projection =
filters.predicates.first()?.projection().clone();
+ for predicate in filters.predicates.iter() {
+ cache_projection.union(predicate.projection());
+ }
+ cache_projection.intersect(projection);
+ self.exclude_nested_columns_from_cache(&cache_projection)
+ }
+
+ /// Exclude leaves belonging to roots that span multiple parquet leaves
(i.e. nested columns)
+ fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) ->
Option<ProjectionMask> {
+ let schema = self.metadata.file_metadata().schema_descr();
+ let num_leaves = schema.num_columns();
+
+ // Count how many leaves each root column has
+ let num_roots = schema.root_schema().get_fields().len();
+ let mut root_leaf_counts = vec![0usize; num_roots];
+ for leaf_idx in 0..num_leaves {
+ let root_idx = schema.get_column_root_idx(leaf_idx);
+ root_leaf_counts[root_idx] += 1;
+ }
+
+ // Keep only leaves whose root has exactly one leaf (non-nested)
+ let mut included_leaves = Vec::new();
+ for leaf_idx in 0..num_leaves {
+ if mask.leaf_included(leaf_idx) {
+ let root_idx = schema.get_column_root_idx(leaf_idx);
+ if root_leaf_counts[root_idx] == 1 {
+ included_leaves.push(leaf_idx);
+ }
+ }
+ }
+
+ if included_leaves.is_empty() {
+ None
+ } else {
+ Some(ProjectionMask::leaves(schema, included_leaves))
+ }
+ }
}
enum StreamState<T> {
@@ -897,9 +983,13 @@ impl InMemoryRowGroup<'_> {
input: &mut T,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
+ batch_size: usize,
+ cache_mask: Option<&ProjectionMask>,
) -> Result<()> {
let metadata = self.metadata.row_group(self.row_group_idx);
if let Some((selection, offset_index)) =
selection.zip(self.offset_index) {
+ let expanded_selection =
+ selection.expand_to_batch_boundaries(batch_size,
self.row_count);
// If we have a `RowSelection` and an `OffsetIndex` then only
fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<u64>> = vec![];
@@ -924,7 +1014,15 @@ impl InMemoryRowGroup<'_> {
_ => (),
}
-
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
+ // Expand selection to batch boundaries only for cached
columns
+ let use_expanded = cache_mask.map(|m|
m.leaf_included(idx)).unwrap_or(false);
+ if use_expanded {
+ ranges.extend(
+
expanded_selection.scan_ranges(&offset_index[idx].page_locations),
+ );
+ } else {
+
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
+ }
page_start_offsets.push(ranges.iter().map(|range|
range.start).collect());
ranges
@@ -1883,6 +1981,8 @@ mod tests {
filter: None,
limit: None,
offset: None,
+ metrics: ArrowReaderMetrics::disabled(),
+ max_predicate_cache_size: 0,
};
let mut skip = true;
@@ -2286,6 +2386,77 @@ mod tests {
assert_eq!(requests.lock().unwrap().len(), 3);
}
+ #[tokio::test]
+ async fn test_cache_projection_excludes_nested_columns() {
+ use arrow_array::{ArrayRef, StringArray};
+
+ // Build a simple RecordBatch with a primitive column `a` and a nested
struct column `b { aa, bb }`
+ let a = StringArray::from_iter_values(["r1", "r2"]);
+ let b = StructArray::from(vec![
+ (
+ Arc::new(Field::new("aa", DataType::Utf8, true)),
+ Arc::new(StringArray::from_iter_values(["v1", "v2"])) as
ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("bb", DataType::Utf8, true)),
+ Arc::new(StringArray::from_iter_values(["w1", "w2"])) as
ArrayRef,
+ ),
+ ]);
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", b.data_type().clone(), true),
+ ]));
+
+ let mut buf = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
+ let batch = RecordBatch::try_from_iter([
+ ("a", Arc::new(a) as ArrayRef),
+ ("b", Arc::new(b) as ArrayRef),
+ ])
+ .unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ // Load Parquet metadata
+ let data: Bytes = buf.into();
+ let metadata = ParquetMetaDataReader::new()
+ .parse_and_finish(&data)
+ .unwrap();
+ let metadata = Arc::new(metadata);
+
+ // Build a RowFilter whose predicate projects a leaf under the nested
root `b`
+ // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick
index 1 (b.aa)
+ let parquet_schema = metadata.file_metadata().schema_descr();
+ let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
+
+ let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(),
|batch: RecordBatch| {
+ Ok(arrow_array::BooleanArray::from(vec![
+ true;
+ batch.num_rows()
+ ]))
+ });
+ let filter = RowFilter::new(vec![Box::new(always_true)]);
+
+ // Construct a ReaderFactory and compute cache projection
+ let reader_factory = ReaderFactory {
+ metadata: Arc::clone(&metadata),
+ fields: None,
+ input: TestReader::new(data),
+ filter: Some(filter),
+ limit: None,
+ offset: None,
+ metrics: ArrowReaderMetrics::disabled(),
+ max_predicate_cache_size: 0,
+ };
+
+ // Provide an output projection that also selects the same nested leaf
+ let cache_projection =
reader_factory.compute_cache_projection(&nested_leaf_mask);
+
+ // Expect None since nested columns should be excluded from cache
projection
+ assert!(cache_projection.is_none());
+ }
+
#[tokio::test]
async fn empty_offset_index_doesnt_panic_in_read_row_group() {
use tokio::fs::File;
@@ -2386,4 +2557,53 @@ mod tests {
let result = reader.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(result.len(), 1);
}
+
+ #[tokio::test]
+ async fn test_cached_array_reader_sparse_offset_error() {
+ use futures::TryStreamExt;
+
+ use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter,
RowSelection, RowSelector};
+ use arrow_array::{BooleanArray, RecordBatch};
+
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
+ let data = Bytes::from(std::fs::read(path).unwrap());
+
+ let async_reader = TestReader::new(data);
+
+ // Enable page index so the fetch logic loads only required pages
+ let options = ArrowReaderOptions::new().with_page_index(true);
+ let builder =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
+ .await
+ .unwrap();
+
+ // Skip the first 22 rows (entire first Parquet page) and then select
the
+ // next 3 rows (22, 23, 24). This means the fetch step will not include
+ // the first page starting at file offset 0.
+ let selection = RowSelection::from(vec![RowSelector::skip(22),
RowSelector::select(3)]);
+
+ // Trivial predicate on column 0 that always returns `true`. Using the
+ // same column in both predicate and projection activates the caching
+ // layer (Producer/Consumer pattern).
+ let parquet_schema = builder.parquet_schema();
+ let proj = ProjectionMask::leaves(parquet_schema, vec![0]);
+ let always_true = ArrowPredicateFn::new(proj.clone(), |batch:
RecordBatch| {
+ Ok(BooleanArray::from(vec![true; batch.num_rows()]))
+ });
+ let filter = RowFilter::new(vec![Box::new(always_true)]);
+
+ // Build the stream with batch size 8 so the cache reads whole batches
+ // that straddle the requested row range (rows 0-7, 8-15, 16-23, …).
+ let stream = builder
+ .with_batch_size(8)
+ .with_projection(proj)
+ .with_row_selection(selection)
+ .with_row_filter(filter)
+ .build()
+ .unwrap();
+
+ // Collecting the stream should fail with the sparse column chunk
offset
+ // error we want to reproduce.
+ let _result: Vec<_> = stream.try_collect().await.unwrap();
+ }
}
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index 33010f4808..72626d70e0 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -276,6 +276,13 @@ impl ProjectionMask {
Self { mask: None }
}
+ /// Create a [`ProjectionMask`] which selects no columns
+ pub fn none(len: usize) -> Self {
+ Self {
+ mask: Some(vec![false; len]),
+ }
+ }
+
/// Create a [`ProjectionMask`] which selects only the specified leaf
columns
///
/// Note: repeated or out of order indices will not impact the final mask
diff --git a/parquet/tests/arrow_reader/mod.rs
b/parquet/tests/arrow_reader/mod.rs
index 48d732f17f..8d72d1def1 100644
--- a/parquet/tests/arrow_reader/mod.rs
+++ b/parquet/tests/arrow_reader/mod.rs
@@ -42,6 +42,8 @@ mod bad_data;
#[cfg(feature = "crc")]
mod checksum;
mod int96_stats_roundtrip;
+#[cfg(feature = "async")]
+mod predicate_cache;
mod statistics;
// returns a struct array with columns "int32_col", "float32_col" and
"float64_col" with the specified values
diff --git a/parquet/tests/arrow_reader/predicate_cache.rs
b/parquet/tests/arrow_reader/predicate_cache.rs
new file mode 100644
index 0000000000..44d43113cb
--- /dev/null
+++ b/parquet/tests/arrow_reader/predicate_cache.rs
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test for predicate cache in Parquet Arrow reader
+
+use arrow::array::ArrayRef;
+use arrow::array::Int64Array;
+use arrow::compute::and;
+use arrow::compute::kernels::cmp::{gt, lt};
+use arrow_array::cast::AsArray;
+use arrow_array::types::Int64Type;
+use arrow_array::{RecordBatch, StringViewArray};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, StreamExt};
+use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions,
RowFilter};
+use parquet::arrow::arrow_reader::{ArrowReaderBuilder,
ParquetRecordBatchReaderBuilder};
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder,
ProjectionMask};
+use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
+use parquet::file::properties::WriterProperties;
+use std::ops::Range;
+use std::sync::Arc;
+use std::sync::LazyLock;
+
+#[tokio::test]
+async fn test_default_read() {
+ // The cache is not used without predicates, so we expect 0 records read
from cache
+ let test =
ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0);
+ let sync_builder = test.sync_builder();
+ test.run_sync(sync_builder);
+ let async_builder = test.async_builder().await;
+ test.run_async(async_builder).await;
+}
+
+#[tokio::test]
+async fn test_async_cache_with_filters() {
+ let test =
ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(49);
+ let async_builder = test.async_builder().await;
+ let async_builder = test.add_project_ab_and_filter_b(async_builder);
+ test.run_async(async_builder).await;
+}
+
+#[tokio::test]
+async fn test_sync_cache_with_filters() {
+ let test = ParquetPredicateCacheTest::new()
+ // The sync reader does not use the cache. See
https://github.com/apache/arrow-rs/issues/8000
+ .with_expected_records_read_from_cache(0);
+
+ let sync_builder = test.sync_builder();
+ let sync_builder = test.add_project_ab_and_filter_b(sync_builder);
+ test.run_sync(sync_builder);
+}
+
+#[tokio::test]
+async fn test_cache_disabled_with_filters() {
+ // expect no records to be read from cache, because the cache is disabled
+ let test =
ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0);
+ let sync_builder = test.sync_builder().with_max_predicate_cache_size(0);
+ let sync_builder = test.add_project_ab_and_filter_b(sync_builder);
+ test.run_sync(sync_builder);
+
+ let async_builder =
test.async_builder().await.with_max_predicate_cache_size(0);
+ let async_builder = test.add_project_ab_and_filter_b(async_builder);
+ test.run_async(async_builder).await;
+}
+
+// -- Begin test infrastructure --
+
+/// A test parquet file
+struct ParquetPredicateCacheTest {
+ bytes: Bytes,
+ expected_records_read_from_cache: usize,
+}
+impl ParquetPredicateCacheTest {
+ /// Create a new `TestParquetFile` with:
+ /// 3 columns: "a", "b", "c"
+ ///
+ /// 2 row groups, each with 200 rows
+ /// each data page has 100 rows
+ ///
+ /// Values of column "a" are 0..399
+ /// Values of column "b" are 400..799
+ /// Values of column "c" are alternating strings of length 12 and longer
+ fn new() -> Self {
+ Self {
+ bytes: TEST_FILE_DATA.clone(),
+ expected_records_read_from_cache: 0,
+ }
+ }
+
+ /// Set the expected number of records read from the cache
+ fn with_expected_records_read_from_cache(
+ mut self,
+ expected_records_read_from_cache: usize,
+ ) -> Self {
+ self.expected_records_read_from_cache =
expected_records_read_from_cache;
+ self
+ }
+
+ /// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file
+ fn sync_builder(&self) -> ParquetRecordBatchReaderBuilder<Bytes> {
+ let reader = self.bytes.clone();
+ ParquetRecordBatchReaderBuilder::try_new_with_options(reader,
ArrowReaderOptions::default())
+ .expect("ParquetRecordBatchReaderBuilder")
+ }
+
+ /// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file
+ async fn async_builder(&self) ->
ParquetRecordBatchStreamBuilder<TestReader> {
+ let reader = TestReader::new(self.bytes.clone());
+ ParquetRecordBatchStreamBuilder::new_with_options(reader,
ArrowReaderOptions::default())
+ .await
+ .unwrap()
+ }
+
+ /// Return a [`ParquetRecordBatchReaderBuilder`] for reading the file with
+ ///
+ /// 1. a projection selecting the "a" and "b" column
+ /// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page
from each row group)
+ fn add_project_ab_and_filter_b<T>(
+ &self,
+ builder: ArrowReaderBuilder<T>,
+ ) -> ArrowReaderBuilder<T> {
+ let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
+
+ // "b" > 575 and "b" < 625
+ let row_filter = ArrowPredicateFn::new(
+ ProjectionMask::columns(&schema_descr, ["b"]),
+ |batch: RecordBatch| {
+ let scalar_575 = Int64Array::new_scalar(575);
+ let scalar_625 = Int64Array::new_scalar(625);
+ let column = batch.column(0).as_primitive::<Int64Type>();
+ and(>(column, &scalar_575)?, <(column, &scalar_625)?)
+ },
+ );
+
+ builder
+ .with_projection(ProjectionMask::columns(&schema_descr, ["a",
"b"]))
+ .with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
+ }
+
+ /// Build the reader from the specified builder, reading all batches from
it,
+ /// and asserts the
+ fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder<Bytes>) {
+ let metrics = ArrowReaderMetrics::enabled();
+
+ let reader = builder.with_metrics(metrics.clone()).build().unwrap();
+ for batch in reader {
+ match batch {
+ Ok(_) => {}
+ Err(e) => panic!("Error reading batch: {e}"),
+ }
+ }
+ self.verify_metrics(metrics)
+ }
+
+ /// Build the reader from the specified builder, reading all batches from
it,
+ /// and asserts the
+ async fn run_async(&self, builder:
ParquetRecordBatchStreamBuilder<TestReader>) {
+ let metrics = ArrowReaderMetrics::enabled();
+
+ let mut stream =
builder.with_metrics(metrics.clone()).build().unwrap();
+ while let Some(batch) = stream.next().await {
+ match batch {
+ Ok(_) => {}
+ Err(e) => panic!("Error reading batch: {e}"),
+ }
+ }
+ self.verify_metrics(metrics)
+ }
+
+ fn verify_metrics(&self, metrics: ArrowReaderMetrics) {
+ let Self {
+ bytes: _,
+ expected_records_read_from_cache,
+ } = self;
+
+ let read_from_cache = metrics
+ .records_read_from_cache()
+ .expect("Metrics enabled, so should have metrics");
+
+ assert_eq!(
+ &read_from_cache, expected_records_read_from_cache,
+ "Expected {expected_records_read_from_cache} records read
from cache, but got {read_from_cache}"
+ );
+ }
+}
+
+/// Create a parquet file in memory for testing. See [`test_file`] for details.
+static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
+ // Input batch has 400 rows, with 3 columns: "a", "b", "c"
+ // Note c is a different types (so the data page sizes will be different)
+ let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
+ let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
+ let c: ArrayRef =
Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
+ if i % 2 == 0 {
+ format!("string_{i}")
+ } else {
+ format!("A string larger than 12 bytes and thus not inlined {i}")
+ }
+ })));
+
+ let input_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b),
("c", c)]).unwrap();
+
+ let mut output = Vec::new();
+
+ let writer_options = WriterProperties::builder()
+ .set_max_row_group_size(200)
+ .set_data_page_row_count_limit(100)
+ .build();
+ let mut writer =
+ ArrowWriter::try_new(&mut output, input_batch.schema(),
Some(writer_options)).unwrap();
+
+ // since the limits are only enforced on batch boundaries, write the input
+ // batch in chunks of 50
+ let mut row_remain = input_batch.num_rows();
+ while row_remain > 0 {
+ let chunk_size = row_remain.min(50);
+ let chunk = input_batch.slice(input_batch.num_rows() - row_remain,
chunk_size);
+ writer.write(&chunk).unwrap();
+ row_remain -= chunk_size;
+ }
+ writer.close().unwrap();
+ Bytes::from(output)
+});
+
+/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮
+/// TODO put this in a common place
+#[derive(Clone)]
+struct TestReader {
+ data: Bytes,
+ metadata: Option<Arc<ParquetMetaData>>,
+}
+
+impl TestReader {
+ fn new(data: Bytes) -> Self {
+ Self {
+ data,
+ metadata: Default::default(),
+ }
+ }
+}
+
+impl AsyncFileReader for TestReader {
+ fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_,
parquet::errors::Result<Bytes>> {
+ let range = range.clone();
+ futures::future::ready(Ok(self
+ .data
+ .slice(range.start as usize..range.end as usize)))
+ .boxed()
+ }
+
+ fn get_metadata<'a>(
+ &'a mut self,
+ options: Option<&'a ArrowReaderOptions>,
+ ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
+ let metadata_reader =
+
ParquetMetaDataReader::new().with_page_indexes(options.is_some_and(|o|
o.page_index()));
+ self.metadata = Some(Arc::new(
+ metadata_reader.parse_and_finish(&self.data).unwrap(),
+ ));
+
futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
+ }
+}