This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 537c10adf9 Move bloom filter tests from parquet reader (#9354)
537c10adf9 is described below
commit 537c10adf9d9c94020c8cf01eeabaf1f7559a91d
Author: Kosta Tarasov <[email protected]>
AuthorDate: Wed Feb 4 16:08:21 2026 -0500
Move bloom filter tests from parquet reader (#9354)
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
-->
- Part of #9269.
- Part of #9348.
# Rationale for this change
check issues
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
# What changes are included in this PR?
- Moved bloom filter tests out of sync and async readers to
`parquet/tests/bloom_filter/`
- Moved `TestReader` out of
`parquet/tests/arrow_reader/predicate_cache.rs`
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
# Are these changes tested?
Code movement
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
# Are there any user-facing changes?
Code movement
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
If there are any breaking changes to public APIs, please call them out.
-->
---
parquet/src/arrow/arrow_reader/mod.rs | 50 ---------------
parquet/src/arrow/async_reader/mod.rs | 58 -----------------
parquet/tests/arrow_reader/bloom_filter/async.rs | 82 ++++++++++++++++++++++++
parquet/tests/arrow_reader/bloom_filter/mod.rs | 20 ++++++
parquet/tests/arrow_reader/bloom_filter/sync.rs | 74 +++++++++++++++++++++
parquet/tests/arrow_reader/io/mod.rs | 70 ++++++++++++++++++++
parquet/tests/arrow_reader/mod.rs | 1 +
parquet/tests/arrow_reader/predicate_cache.rs | 52 +--------------
8 files changed, 249 insertions(+), 158 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 3a31c69ff3..d039841800 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -5505,56 +5505,6 @@ pub(crate) mod tests {
c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
}
- #[test]
- fn test_get_row_group_column_bloom_filter_with_length() {
- // convert to new parquet file with bloom_filter_length
- let testdata = arrow::util::test_util::parquet_test_data();
- let path =
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
- let file = File::open(path).unwrap();
- let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
- let schema = builder.schema().clone();
- let reader = builder.build().unwrap();
-
- let mut parquet_data = Vec::new();
- let props = WriterProperties::builder()
- .set_bloom_filter_enabled(true)
- .build();
- let mut writer = ArrowWriter::try_new(&mut parquet_data, schema,
Some(props)).unwrap();
- for batch in reader {
- let batch = batch.unwrap();
- writer.write(&batch).unwrap();
- }
- writer.close().unwrap();
-
- // test the new parquet file
- test_get_row_group_column_bloom_filter(parquet_data.into(), true);
- }
-
- #[test]
- fn test_get_row_group_column_bloom_filter_without_length() {
- let testdata = arrow::util::test_util::parquet_test_data();
- let path =
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
- let data = Bytes::from(std::fs::read(path).unwrap());
- test_get_row_group_column_bloom_filter(data, false);
- }
-
- fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
- let builder =
ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
-
- let metadata = builder.metadata();
- assert_eq!(metadata.num_row_groups(), 1);
- let row_group = metadata.row_group(0);
- let column = row_group.column(0);
- assert_eq!(column.bloom_filter_length().is_some(), with_length);
-
- let sbbf = builder
- .get_row_group_column_bloom_filter(0, 0)
- .unwrap()
- .unwrap();
- assert!(sbbf.check(&"Hello"));
- assert!(!sbbf.check(&"Hello_Not_Exists"));
- }
-
#[test]
fn test_read_unknown_logical_type() {
let testdata = arrow::util::test_util::parquet_test_data();
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 7aa1ea3b6f..b4824365ea 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -1609,14 +1609,6 @@ mod tests {
let _stream = builder.build().unwrap();
}
- #[tokio::test]
- async fn test_get_row_group_column_bloom_filter_without_length() {
- let testdata = arrow::util::test_util::parquet_test_data();
- let path =
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
- let data = Bytes::from(std::fs::read(path).unwrap());
- test_get_row_group_column_bloom_filter(data, false).await;
- }
-
#[tokio::test]
async fn test_parquet_record_batch_stream_schema() {
fn get_all_field_names(schema: &Schema) -> Vec<&String> {
@@ -1725,56 +1717,6 @@ mod tests {
}
}
- #[tokio::test]
- async fn test_get_row_group_column_bloom_filter_with_length() {
- // convert to new parquet file with bloom_filter_length
- let testdata = arrow::util::test_util::parquet_test_data();
- let path =
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
- let data = Bytes::from(std::fs::read(path).unwrap());
- let async_reader = TestReader::new(data.clone());
- let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
- .await
- .unwrap();
- let schema = builder.schema().clone();
- let stream = builder.build().unwrap();
- let batches = stream.try_collect::<Vec<_>>().await.unwrap();
-
- let mut parquet_data = Vec::new();
- let props = WriterProperties::builder()
- .set_bloom_filter_enabled(true)
- .build();
- let mut writer = ArrowWriter::try_new(&mut parquet_data, schema,
Some(props)).unwrap();
- for batch in batches {
- writer.write(&batch).unwrap();
- }
- writer.close().unwrap();
-
- // test the new parquet file
- test_get_row_group_column_bloom_filter(parquet_data.into(),
true).await;
- }
-
- async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length:
bool) {
- let async_reader = TestReader::new(data.clone());
-
- let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
- .await
- .unwrap();
-
- let metadata = builder.metadata();
- assert_eq!(metadata.num_row_groups(), 1);
- let row_group = metadata.row_group(0);
- let column = row_group.column(0);
- assert_eq!(column.bloom_filter_length().is_some(), with_length);
-
- let sbbf = builder
- .get_row_group_column_bloom_filter(0, 0)
- .await
- .unwrap()
- .unwrap();
- assert!(sbbf.check(&"Hello"));
- assert!(!sbbf.check(&"Hello_Not_Exists"));
- }
-
#[tokio::test]
async fn test_nested_skip() {
let schema = Arc::new(Schema::new(vec![
diff --git a/parquet/tests/arrow_reader/bloom_filter/async.rs
b/parquet/tests/arrow_reader/bloom_filter/async.rs
new file mode 100644
index 0000000000..e230b33d2d
--- /dev/null
+++ b/parquet/tests/arrow_reader/bloom_filter/async.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::io::TestReader;
+use bytes::Bytes;
+use futures::TryStreamExt;
+use parquet::{
+ arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder},
+ file::properties::WriterProperties,
+};
+
+#[tokio::test]
+async fn test_get_row_group_column_bloom_filter_without_length() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+ let data = Bytes::from(std::fs::read(path).unwrap());
+ test_get_row_group_column_bloom_filter(data, false).await;
+}
+
+#[tokio::test]
+async fn test_get_row_group_column_bloom_filter_with_length() {
+ // convert to new parquet file with bloom_filter_length
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+ let data = Bytes::from(std::fs::read(path).unwrap());
+ let async_reader = TestReader::new(data.clone());
+ let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+ .await
+ .unwrap();
+ let schema = builder.schema().clone();
+ let stream = builder.build().unwrap();
+ let batches = stream.try_collect::<Vec<_>>().await.unwrap();
+
+ let mut parquet_data = Vec::new();
+ let props = WriterProperties::builder()
+ .set_bloom_filter_enabled(true)
+ .build();
+ let mut writer = ArrowWriter::try_new(&mut parquet_data, schema,
Some(props)).unwrap();
+ for batch in batches {
+ writer.write(&batch).unwrap();
+ }
+ writer.close().unwrap();
+
+ // test the new parquet file
+ test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
+}
+
+async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length:
bool) {
+ let async_reader = TestReader::new(data.clone());
+
+ let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
+ .await
+ .unwrap();
+
+ let metadata = builder.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+ let row_group = metadata.row_group(0);
+ let column = row_group.column(0);
+ assert_eq!(column.bloom_filter_length().is_some(), with_length);
+
+ let sbbf = builder
+ .get_row_group_column_bloom_filter(0, 0)
+ .await
+ .unwrap()
+ .unwrap();
+ assert!(sbbf.check(&"Hello"));
+ assert!(!sbbf.check(&"Hello_Not_Exists"));
+}
diff --git a/parquet/tests/arrow_reader/bloom_filter/mod.rs
b/parquet/tests/arrow_reader/bloom_filter/mod.rs
new file mode 100644
index 0000000000..a09fd4a8ac
--- /dev/null
+++ b/parquet/tests/arrow_reader/bloom_filter/mod.rs
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(feature = "async")]
+mod r#async;
+mod sync;
diff --git a/parquet/tests/arrow_reader/bloom_filter/sync.rs
b/parquet/tests/arrow_reader/bloom_filter/sync.rs
new file mode 100644
index 0000000000..90d0cdc509
--- /dev/null
+++ b/parquet/tests/arrow_reader/bloom_filter/sync.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fs::File;
+
+use bytes::Bytes;
+use parquet::{
+ arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
+ file::properties::WriterProperties,
+};
+
+#[test]
+fn test_get_row_group_column_bloom_filter_with_length() {
+ // convert to new parquet file with bloom_filter_length
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+ let file = File::open(path).unwrap();
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ let schema = builder.schema().clone();
+ let reader = builder.build().unwrap();
+
+ let mut parquet_data = Vec::new();
+ let props = WriterProperties::builder()
+ .set_bloom_filter_enabled(true)
+ .build();
+ let mut writer = ArrowWriter::try_new(&mut parquet_data, schema,
Some(props)).unwrap();
+ for batch in reader {
+ let batch = batch.unwrap();
+ writer.write(&batch).unwrap();
+ }
+ writer.close().unwrap();
+
+ // test the new parquet file
+ test_get_row_group_column_bloom_filter(parquet_data.into(), true);
+}
+
+#[test]
+fn test_get_row_group_column_bloom_filter_without_length() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+ let data = Bytes::from(std::fs::read(path).unwrap());
+ test_get_row_group_column_bloom_filter(data, false);
+}
+
+fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
+
+ let metadata = builder.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+ let row_group = metadata.row_group(0);
+ let column = row_group.column(0);
+ assert_eq!(column.bloom_filter_length().is_some(), with_length);
+
+ let sbbf = builder
+ .get_row_group_column_bloom_filter(0, 0)
+ .unwrap()
+ .unwrap();
+ assert!(sbbf.check(&"Hello"));
+ assert!(!sbbf.check(&"Hello_Not_Exists"));
+}
diff --git a/parquet/tests/arrow_reader/io/mod.rs
b/parquet/tests/arrow_reader/io/mod.rs
index 3b11429be4..3e18d7065b 100644
--- a/parquet/tests/arrow_reader/io/mod.rs
+++ b/parquet/tests/arrow_reader/io/mod.rs
@@ -41,13 +41,21 @@ use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::{ArrayRef, BooleanArray, Int64Array, RecordBatch,
StringViewArray};
use bytes::Bytes;
+#[cfg(feature = "async")]
+use futures::FutureExt;
+#[cfg(feature = "async")]
+use futures::future::BoxFuture;
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
RowFilter,
};
+#[cfg(feature = "async")]
+use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ProjectionMask};
use parquet::data_type::AsBytes;
use parquet::file::FOOTER_SIZE;
use parquet::file::metadata::PageIndexPolicy;
+#[cfg(feature = "async")]
+use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::metadata::{FooterTail, ParquetMetaData, ParquetOffsetIndex};
use parquet::file::page_index::offset_index::PageLocation;
use parquet::file::properties::WriterProperties;
@@ -77,6 +85,68 @@ fn test_options() -> ArrowReaderOptions {
ArrowReaderOptions::default().with_page_index_policy(PageIndexPolicy::from(true))
}
+/// In-memory [`AsyncFileReader`] implementation for tests.
+#[cfg(feature = "async")]
+#[derive(Clone)]
+pub(crate) struct TestReader {
+ data: Bytes,
+ metadata: Option<Arc<ParquetMetaData>>,
+ requests: Arc<Mutex<Vec<Range<usize>>>>,
+}
+
+#[cfg(feature = "async")]
+impl TestReader {
+ pub(crate) fn new(data: Bytes) -> Self {
+ Self {
+ data,
+ metadata: Default::default(),
+ requests: Default::default(),
+ }
+ }
+
+ #[allow(dead_code)]
+ pub(crate) fn requests(&self) -> Vec<Range<usize>> {
+ self.requests.lock().unwrap().clone()
+ }
+
+ #[allow(dead_code)]
+ pub(crate) fn clear_requests(&self) {
+ self.requests.lock().unwrap().clear();
+ }
+}
+
+#[cfg(feature = "async")]
+impl AsyncFileReader for TestReader {
+ fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_,
parquet::errors::Result<Bytes>> {
+ self.requests
+ .lock()
+ .unwrap()
+ .push(range.start as usize..range.end as usize);
+ futures::future::ready(Ok(self
+ .data
+ .slice(range.start as usize..range.end as usize)))
+ .boxed()
+ }
+
+ fn get_metadata<'a>(
+ &'a mut self,
+ options: Option<&'a ArrowReaderOptions>,
+ ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
+ let mut metadata_reader = ParquetMetaDataReader::new();
+
+ if let Some(options) = options {
+ metadata_reader = metadata_reader
+ .with_column_index_policy(options.column_index_policy())
+ .with_offset_index_policy(options.offset_index_policy());
+ }
+
+ self.metadata = Some(Arc::new(
+ metadata_reader.parse_and_finish(&self.data).unwrap(),
+ ));
+ futures::future::ready(Ok(self.metadata.clone().unwrap())).boxed()
+ }
+}
+
/// Return a row filter that evaluates "b > 575" AND "b < 625"
///
/// last data page in Row Group 0 and first DataPage in Row Group 1
diff --git a/parquet/tests/arrow_reader/mod.rs
b/parquet/tests/arrow_reader/mod.rs
index 9acfebda48..ffc36655b3 100644
--- a/parquet/tests/arrow_reader/mod.rs
+++ b/parquet/tests/arrow_reader/mod.rs
@@ -39,6 +39,7 @@ use std::sync::Arc;
use tempfile::NamedTempFile;
mod bad_data;
+mod bloom_filter;
#[cfg(feature = "crc")]
mod checksum;
mod int96_stats_roundtrip;
diff --git a/parquet/tests/arrow_reader/predicate_cache.rs
b/parquet/tests/arrow_reader/predicate_cache.rs
index bf3412dd4d..73613c83d2 100644
--- a/parquet/tests/arrow_reader/predicate_cache.rs
+++ b/parquet/tests/arrow_reader/predicate_cache.rs
@@ -17,6 +17,7 @@
//! Test for predicate cache in Parquet Arrow reader
+use super::io::TestReader;
use arrow::array::ArrayRef;
use arrow::array::Int64Array;
use arrow::compute::and;
@@ -26,16 +27,12 @@ use arrow_array::types::Int64Type;
use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray};
use arrow_schema::{DataType, Field};
use bytes::Bytes;
-use futures::future::BoxFuture;
-use futures::{FutureExt, StreamExt};
+use futures::StreamExt;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions,
RowFilter};
use parquet::arrow::arrow_reader::{ArrowReaderBuilder,
ParquetRecordBatchReaderBuilder};
-use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder,
ProjectionMask};
-use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use parquet::file::properties::WriterProperties;
-use std::ops::Range;
use std::sync::Arc;
use std::sync::LazyLock;
@@ -325,48 +322,3 @@ impl<T> ArrowReaderBuilderExt for ArrowReaderBuilder<T> {
.with_row_filter(row_filter)
}
}
-
-/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮
-/// TODO put this in a common place
-#[derive(Clone)]
-struct TestReader {
- data: Bytes,
- metadata: Option<Arc<ParquetMetaData>>,
-}
-
-impl TestReader {
- fn new(data: Bytes) -> Self {
- Self {
- data,
- metadata: Default::default(),
- }
- }
-}
-
-impl AsyncFileReader for TestReader {
- fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_,
parquet::errors::Result<Bytes>> {
- let range = range.clone();
- futures::future::ready(Ok(self
- .data
- .slice(range.start as usize..range.end as usize)))
- .boxed()
- }
-
- fn get_metadata<'a>(
- &'a mut self,
- options: Option<&'a ArrowReaderOptions>,
- ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
- let mut metadata_reader = ParquetMetaDataReader::new();
-
- if let Some(options) = options {
- metadata_reader = metadata_reader
- .with_column_index_policy(options.column_index_policy())
- .with_offset_index_policy(options.offset_index_policy());
- }
-
- self.metadata = Some(Arc::new(
- metadata_reader.parse_and_finish(&self.data).unwrap(),
- ));
-
futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
- }
-}