This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch 57_maintenance
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/57_maintenance by this push:
new 3df3157b5b [57_maintenance] [regression] Error with adaptive predicate
pushdown: "Invalid offset …(#9301) (#9309)
3df3157b5b is described below
commit 3df3157b5b14b9f99869cec4cd9feedc0a6736c1
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Feb 2 10:04:34 2026 -0500
[57_maintenance] [regression] Error with adaptive predicate pushdown:
"Invalid offset …(#9301) (#9309)
- Part of https://github.com/apache/arrow-rs/issues/9240
- Related to https://github.com/apache/arrow-rs/issues/9239
This is a backport of the following PRs to the 57 line
- https://github.com/apache/arrow-rs/pull/9243 from @erratic-pattern
(test)
- https://github.com/apache/arrow-rs/pull/9301 from @sdf-jkl (the fix)
Co-authored-by: Kosta Tarasov <[email protected]>
---
parquet/src/arrow/async_reader/mod.rs | 136 ++++++++++++++++++++-
.../src/arrow/push_decoder/reader_builder/mod.rs | 10 ++
2 files changed, 145 insertions(+), 1 deletion(-)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 60f2ca1615..c7d2e8d609 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -782,7 +782,7 @@ mod tests {
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
- use arrow_array::types::Int32Type;
+ use arrow_array::types::{Int32Type, TimestampNanosecondType};
use arrow_array::{
Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array,
RecordBatchReader,
Scalar, StringArray, StructArray, UInt64Array,
@@ -2308,4 +2308,138 @@ mod tests {
Ok(())
}
+
+ /// Regression test for adaptive predicate pushdown attempting to read
skipped pages.
+ /// Related issue: https://github.com/apache/arrow-rs/issues/9239
+ #[tokio::test]
+ async fn test_predicate_pushdown_with_skipped_pages() {
+ use arrow_array::TimestampNanosecondArray;
+ use arrow_schema::TimeUnit;
+
+ // Time range constants
+ const TIME_IN_RANGE_START: i64 = 1_704_092_400_000_000_000;
+ const TIME_IN_RANGE_END: i64 = 1_704_110_400_000_000_000;
+ const TIME_BEFORE_RANGE: i64 = 1_704_078_000_000_000_000;
+
+ // Create test data: 2 row groups, 300 rows each
+ // "tag" column: 'a', 'b', 'c' (100 rows each, sorted)
+ // "time" column: alternating in-range/out-of-range timestamps
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "time",
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ false,
+ ),
+ Field::new("tag", DataType::Utf8, false),
+ ]));
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(300)
+ .set_data_page_row_count_limit(33)
+ .build();
+
+ let mut buffer = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(),
Some(props)).unwrap();
+
+ // Write 2 row groups
+ for _ in 0..2 {
+ for (tag_idx, tag) in ["a", "b", "c"].iter().enumerate() {
+ let times: Vec<i64> = (0..100)
+ .map(|j| {
+ let row_idx = tag_idx * 100 + j;
+ if row_idx % 2 == 0 {
+ TIME_IN_RANGE_START + (j as i64 * 1_000_000)
+ } else {
+ TIME_BEFORE_RANGE + (j as i64 * 1_000_000)
+ }
+ })
+ .collect();
+ let tags: Vec<&str> = (0..100).map(|_| *tag).collect();
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(TimestampNanosecondArray::from(times)) as
ArrayRef,
+ Arc::new(StringArray::from(tags)) as ArrayRef,
+ ],
+ )
+ .unwrap();
+ writer.write(&batch).unwrap();
+ }
+ writer.flush().unwrap();
+ }
+ writer.close().unwrap();
+ let buffer = Bytes::from(buffer);
+ // Read back with various page index policies, should get the same
answer with all
+ for policy in [
+ PageIndexPolicy::Skip,
+ PageIndexPolicy::Optional,
+ PageIndexPolicy::Required,
+ ] {
+ println!("Testing with page index policy: {:?}", policy);
+ let reader = TestReader::new(buffer.clone());
+ let options =
ArrowReaderOptions::default().with_page_index_policy(policy);
+ let builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
+ .await
+ .unwrap();
+
+ let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
+ let num_row_groups = builder.metadata().num_row_groups();
+
+ // Initial selection: skip middle 100 rows (tag='b') per row group
+ let mut selectors = Vec::new();
+ for _ in 0..num_row_groups {
+ selectors.push(RowSelector::select(100));
+ selectors.push(RowSelector::skip(100));
+ selectors.push(RowSelector::select(100));
+ }
+ let selection = RowSelection::from(selectors);
+
+ // Predicate 1: time >= START
+ let time_gte_predicate =
+ ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr,
[0]), |batch| {
+ let col =
batch.column(0).as_primitive::<TimestampNanosecondType>();
+ Ok(BooleanArray::from_iter(
+ col.iter().map(|t| t.map(|v| v >=
TIME_IN_RANGE_START)),
+ ))
+ });
+
+ // Predicate 2: time < END
+ let time_lt_predicate =
+ ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr,
[0]), |batch| {
+ let col =
batch.column(0).as_primitive::<TimestampNanosecondType>();
+ Ok(BooleanArray::from_iter(
+ col.iter().map(|t| t.map(|v| v < TIME_IN_RANGE_END)),
+ ))
+ });
+
+ let row_filter = RowFilter::new(vec![
+ Box::new(time_gte_predicate),
+ Box::new(time_lt_predicate),
+ ]);
+
+ // Output projection: Only tag column (time not in output)
+ let projection = ProjectionMask::roots(&schema_descr, [1]);
+
+ let stream = builder
+ .with_row_filter(row_filter)
+ .with_row_selection(selection)
+ .with_projection(projection)
+ .build()
+ .unwrap();
+
+ // Stream should complete without error and the same results
+ let batches: Vec<RecordBatch> =
stream.try_collect().await.unwrap();
+
+ let batch = concat_batches(&batches[0].schema(),
&batches).unwrap();
+ assert_eq!(batch.num_columns(), 1);
+ let expected = StringArray::from_iter_values(
+ std::iter::repeat_n("a", 50)
+ .chain(std::iter::repeat_n("c", 50))
+ .chain(std::iter::repeat_n("a", 50))
+ .chain(std::iter::repeat_n("c", 50)),
+ );
+ assert_eq!(batch.column(0).as_string(), &expected);
+ }
+ }
}
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index 61a244589c..919f94c892 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -437,6 +437,16 @@ impl RowGroupReaderBuilder {
.with_parquet_metadata(&self.metadata)
.build_array_reader(self.fields.as_deref(),
predicate.projection())?;
+ // Prepare to evaluate the filter.
+ // Note: first update the selection strategy to properly
handle any pages
+ // pruned during fetch
+ plan_builder = override_selector_strategy_if_needed(
+ plan_builder,
+ predicate.projection(),
+ self.row_group_offset_index(row_group_idx),
+ );
+ // `with_predicate` actually evaluates the filter
+
plan_builder =
plan_builder.with_predicate(array_reader,
filter_info.current_mut())?;