This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a535d3bf64 feat: add method for sync Parquet reader read bloom filter 
(#8024)
a535d3bf64 is described below

commit a535d3bf64f3818b4490ee81d5ee364def668ac9
Author: mwish <[email protected]>
AuthorDate: Sat Aug 2 02:46:25 2025 +0800

    feat: add method for sync Parquet reader read bloom filter (#8024)
    
    # Which issue does this PR close?
    
    - Closes #8023
    
    # Rationale for this change
    
    Add sync parquet read bloom filter.
    
    # What changes are included in this PR?
    
    Add a sync `get_row_group_column_bloom_filter`
    
    # Are these changes tested?
    
    By unittests
    
    # Are there any user-facing changes?
    
    Api added
---
 parquet/src/arrow/arrow_reader/mod.rs | 114 ++++++++++++++++++++++++++++++++++
 1 file changed, 114 insertions(+)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 900c10659d..d4a3e11e2c 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -30,12 +30,16 @@ pub use crate::arrow::array_reader::RowGroups;
 use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
 use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
 use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
+use crate::bloom_filter::{
+    chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
+};
 use crate::column::page::{PageIterator, PageReader};
 #[cfg(feature = "encryption")]
 use crate::encryption::decrypt::FileDecryptionProperties;
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 use crate::file::reader::{ChunkReader, SerializedPageReader};
+use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, 
BloomFilterHash};
 use crate::schema::types::SchemaDescriptor;
 
 pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
@@ -703,6 +707,66 @@ impl<T: ChunkReader + 'static> 
ParquetRecordBatchReaderBuilder<T> {
         Self::new_builder(SyncReader(input), metadata)
     }
 
+    /// Read bloom filter for a column in a row group
+    ///
+    /// Returns `None` if the column does not have a bloom filter
+    ///
+    /// We should call this function after other forms pruning, such as 
projection and predicate pushdown.
+    pub fn get_row_group_column_bloom_filter(
+        &mut self,
+        row_group_idx: usize,
+        column_idx: usize,
+    ) -> Result<Option<Sbbf>> {
+        let metadata = self.metadata.row_group(row_group_idx);
+        let column_metadata = metadata.column(column_idx);
+
+        let offset: u64 = if let Some(offset) = 
column_metadata.bloom_filter_offset() {
+            offset
+                .try_into()
+                .map_err(|_| ParquetError::General("Bloom filter offset is 
invalid".to_string()))?
+        } else {
+            return Ok(None);
+        };
+
+        let buffer = match column_metadata.bloom_filter_length() {
+            Some(length) => self.input.0.get_bytes(offset, length as usize),
+            None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
+        }?;
+
+        let (header, bitset_offset) =
+            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
+
+        match header.algorithm {
+            BloomFilterAlgorithm::BLOCK(_) => {
+                // this match exists to future proof the singleton algorithm 
enum
+            }
+        }
+        match header.compression {
+            BloomFilterCompression::UNCOMPRESSED(_) => {
+                // this match exists to future proof the singleton compression 
enum
+            }
+        }
+        match header.hash {
+            BloomFilterHash::XXHASH(_) => {
+                // this match exists to future proof the singleton hash enum
+            }
+        }
+
+        let bitset = match column_metadata.bloom_filter_length() {
+            Some(_) => buffer.slice(
+                (TryInto::<usize>::try_into(bitset_offset).unwrap()
+                    - TryInto::<usize>::try_into(offset).unwrap())..,
+            ),
+            None => {
+                let bitset_length: usize = 
header.num_bytes.try_into().map_err(|_| {
+                    ParquetError::General("Bloom filter length is 
invalid".to_string())
+                })?;
+                self.input.0.get_bytes(bitset_offset, bitset_length)?
+            }
+        };
+        Ok(Some(Sbbf::new(&bitset)))
+    }
+
     /// Build a [`ParquetRecordBatchReader`]
     ///
     /// Note: this will eagerly evaluate any `RowFilter` before returning
@@ -4720,4 +4784,54 @@ mod tests {
         assert_eq!(c0.len(), c1.len());
         c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
     }
+
+    #[test]
+    fn test_get_row_group_column_bloom_filter_with_length() {
+        // convert to new parquet file with bloom_filter_length
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+        let file = File::open(path).unwrap();
+        let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+        let schema = builder.schema().clone();
+        let reader = builder.build().unwrap();
+
+        let mut parquet_data = Vec::new();
+        let props = WriterProperties::builder()
+            .set_bloom_filter_enabled(true)
+            .build();
+        let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, 
Some(props)).unwrap();
+        for batch in reader {
+            let batch = batch.unwrap();
+            writer.write(&batch).unwrap();
+        }
+        writer.close().unwrap();
+
+        // test the new parquet file
+        test_get_row_group_column_bloom_filter(parquet_data.into(), true);
+    }
+
+    #[test]
+    fn test_get_row_group_column_bloom_filter_without_length() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = 
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
+        let data = Bytes::from(std::fs::read(path).unwrap());
+        test_get_row_group_column_bloom_filter(data, false);
+    }
+
+    fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
+        let mut builder = 
ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
+
+        let metadata = builder.metadata();
+        assert_eq!(metadata.num_row_groups(), 1);
+        let row_group = metadata.row_group(0);
+        let column = row_group.column(0);
+        assert_eq!(column.bloom_filter_length().is_some(), with_length);
+
+        let sbbf = builder
+            .get_row_group_column_bloom_filter(0, 0)
+            .unwrap()
+            .unwrap();
+        assert!(sbbf.check(&"Hello"));
+        assert!(!sbbf.check(&"Hello_Not_Exists"));
+    }
 }

Reply via email to