This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a535d3bf64 feat: add method for sync Parquet reader read bloom filter
(#8024)
a535d3bf64 is described below
commit a535d3bf64f3818b4490ee81d5ee364def668ac9
Author: mwish <[email protected]>
AuthorDate: Sat Aug 2 02:46:25 2025 +0800
feat: add method for sync Parquet reader read bloom filter (#8024)
# Which issue does this PR close?
- Closes #8023
# Rationale for this change
Add sync parquet read bloom filter.
# What changes are included in this PR?
Add a sync `get_row_group_column_bloom_filter`
# Are these changes tested?
By unittests
# Are there any user-facing changes?
Api added
---
parquet/src/arrow/arrow_reader/mod.rs | 114 ++++++++++++++++++++++++++++++++++
1 file changed, 114 insertions(+)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 900c10659d..d4a3e11e2c 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -30,12 +30,16 @@ pub use crate::arrow::array_reader::RowGroups;
use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
+use crate::bloom_filter::{
+ chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
+};
use crate::column::page::{PageIterator, PageReader};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::reader::{ChunkReader, SerializedPageReader};
+use crate::format::{BloomFilterAlgorithm, BloomFilterCompression,
BloomFilterHash};
use crate::schema::types::SchemaDescriptor;
pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
@@ -703,6 +707,66 @@ impl<T: ChunkReader + 'static>
ParquetRecordBatchReaderBuilder<T> {
Self::new_builder(SyncReader(input), metadata)
}
+ /// Read bloom filter for a column in a row group
+ ///
+ /// Returns `None` if the column does not have a bloom filter
+ ///
+ /// We should call this function after other forms pruning, such as
projection and predicate pushdown.
+ pub fn get_row_group_column_bloom_filter(
+ &mut self,
+ row_group_idx: usize,
+ column_idx: usize,
+ ) -> Result<Option<Sbbf>> {
+ let metadata = self.metadata.row_group(row_group_idx);
+ let column_metadata = metadata.column(column_idx);
+
+ let offset: u64 = if let Some(offset) =
column_metadata.bloom_filter_offset() {
+ offset
+ .try_into()
+ .map_err(|_| ParquetError::General("Bloom filter offset is
invalid".to_string()))?
+ } else {
+ return Ok(None);
+ };
+
+ let buffer = match column_metadata.bloom_filter_length() {
+ Some(length) => self.input.0.get_bytes(offset, length as usize),
+ None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
+ }?;
+
+ let (header, bitset_offset) =
+ chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
+
+ match header.algorithm {
+ BloomFilterAlgorithm::BLOCK(_) => {
+ // this match exists to future proof the singleton algorithm
enum
+ }
+ }
+ match header.compression {
+ BloomFilterCompression::UNCOMPRESSED(_) => {
+ // this match exists to future proof the singleton compression
enum
+ }
+ }
+ match header.hash {
+ BloomFilterHash::XXHASH(_) => {
+ // this match exists to future proof the singleton hash enum
+ }
+ }
+
+ let bitset = match column_metadata.bloom_filter_length() {
+ Some(_) => buffer.slice(
+ (TryInto::<usize>::try_into(bitset_offset).unwrap()
+ - TryInto::<usize>::try_into(offset).unwrap())..,
+ ),
+ None => {
+ let bitset_length: usize =
header.num_bytes.try_into().map_err(|_| {
+ ParquetError::General("Bloom filter length is
invalid".to_string())
+ })?;
+ self.input.0.get_bytes(bitset_offset, bitset_length)?
+ }
+ };
+ Ok(Some(Sbbf::new(&bitset)))
+ }
+
/// Build a [`ParquetRecordBatchReader`]
///
/// Note: this will eagerly evaluate any `RowFilter` before returning
@@ -4720,4 +4784,54 @@ mod tests {
assert_eq!(c0.len(), c1.len());
c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
}
+
+ #[test]
+ fn test_get_row_group_column_bloom_filter_with_length() {
+ // convert to new parquet file with bloom_filter_length
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path =
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+ let file = File::open(path).unwrap();
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ let schema = builder.schema().clone();
+ let reader = builder.build().unwrap();
+
+ let mut parquet_data = Vec::new();
+ let props = WriterProperties::builder()
+ .set_bloom_filter_enabled(true)
+ .build();
+ let mut writer = ArrowWriter::try_new(&mut parquet_data, schema,
Some(props)).unwrap();
+ for batch in reader {
+ let batch = batch.unwrap();
+ writer.write(&batch).unwrap();
+ }
+ writer.close().unwrap();
+
+ // test the new parquet file
+ test_get_row_group_column_bloom_filter(parquet_data.into(), true);
+ }
+
+ #[test]
+ fn test_get_row_group_column_bloom_filter_without_length() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path =
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+ let data = Bytes::from(std::fs::read(path).unwrap());
+ test_get_row_group_column_bloom_filter(data, false);
+ }
+
+ fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
+ let mut builder =
ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
+
+ let metadata = builder.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+ let row_group = metadata.row_group(0);
+ let column = row_group.column(0);
+ assert_eq!(column.bloom_filter_length().is_some(), with_length);
+
+ let sbbf = builder
+ .get_row_group_column_bloom_filter(0, 0)
+ .unwrap()
+ .unwrap();
+ assert!(sbbf.check(&"Hello"));
+ assert!(!sbbf.check(&"Hello_Not_Exists"));
+ }
}