This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new bab30ae3d6 Add ability to skip or transform page encoding statistics
in Parquet metadata (#8797)
bab30ae3d6 is described below
commit bab30ae3d61509aa8c73db33010844d440226af2
Author: Ed Seidl <[email protected]>
AuthorDate: Wed Dec 3 11:42:32 2025 -0800
Add ability to skip or transform page encoding statistics in Parquet
metadata (#8797)
---
parquet/benches/metadata.rs | 34 ++++++-
parquet/src/arrow/arrow_reader/mod.rs | 91 ++++++++++++++++-
parquet/src/basic.rs | 15 +++
parquet/src/file/metadata/memory.rs | 12 ++-
parquet/src/file/metadata/mod.rs | 69 +++++++++++--
parquet/src/file/metadata/options.rs | 131 ++++++++++++++++++++++++-
parquet/src/file/metadata/thrift/encryption.rs | 5 +-
parquet/src/file/metadata/thrift/mod.rs | 56 ++++++++---
parquet/src/file/serialized_reader.rs | 79 +++++++++++++++
9 files changed, 467 insertions(+), 25 deletions(-)
diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs
index 43b08e6b26..c962a4c3fd 100644
--- a/parquet/benches/metadata.rs
+++ b/parquet/benches/metadata.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
use parquet::basic::{Encoding, PageType, Type as PhysicalType};
use parquet::file::metadata::{
ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData,
ParquetMetaDataOptions,
- ParquetMetaDataReader, ParquetMetaDataWriter, RowGroupMetaData,
+ ParquetMetaDataReader, ParquetMetaDataWriter, ParquetStatisticsPolicy,
RowGroupMetaData,
};
use parquet::file::statistics::Statistics;
use parquet::file::writer::TrackedWrite;
@@ -173,6 +173,23 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});
+ let options =
ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true);
+ c.bench_function("decode metadata with stats mask", |b| {
+ b.iter(|| {
+ ParquetMetaDataReader::decode_metadata_with_options(&meta_data,
Some(&options))
+ .unwrap();
+ })
+ });
+
+ let options =
+
ParquetMetaDataOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
+ c.bench_function("decode metadata with skip PES", |b| {
+ b.iter(|| {
+ ParquetMetaDataReader::decode_metadata_with_options(&meta_data,
Some(&options))
+ .unwrap();
+ })
+ });
+
let buf: Bytes = black_box(encoded_meta()).into();
c.bench_function("decode parquet metadata (wide)", |b| {
b.iter(|| {
@@ -187,6 +204,21 @@ fn criterion_benchmark(c: &mut Criterion) {
ParquetMetaDataReader::decode_metadata_with_options(&buf,
Some(&options)).unwrap();
})
});
+
+ let options =
ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true);
+ c.bench_function("decode metadata (wide) with stats mask", |b| {
+ b.iter(|| {
+ ParquetMetaDataReader::decode_metadata_with_options(&buf,
Some(&options)).unwrap();
+ })
+ });
+
+ let options =
+
ParquetMetaDataOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
+ c.bench_function("decode metadata (wide) with skip PES", |b| {
+ b.iter(|| {
+ ParquetMetaDataReader::decode_metadata_with_options(&buf,
Some(&options)).unwrap();
+ })
+ });
}
criterion_group!(benches, criterion_benchmark);
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 2b806db896..a626076ebd 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -42,7 +42,7 @@ use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{
PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions,
ParquetMetaDataReader,
- RowGroupMetaData,
+ ParquetStatisticsPolicy, RowGroupMetaData,
};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;
@@ -557,6 +557,30 @@ impl ArrowReaderOptions {
self
}
+ /// Set whether to convert the [`encoding_stats`] in the Parquet
`ColumnMetaData` to a bitmask
+ /// (defaults to `false`).
+ ///
+ /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an
explanation of why this
+ /// might be desirable.
+ ///
+ /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
+ /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
+ /// [`encoding_stats`]:
+ ///
https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
+ pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
+ self.metadata_options.set_encoding_stats_as_mask(val);
+ self
+ }
+
+ /// Sets the decoding policy for [`encoding_stats`] in the Parquet
`ColumnMetaData`.
+ ///
+ /// [`encoding_stats`]:
+ ///
https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
+ pub fn with_encoding_stats_policy(mut self, policy:
ParquetStatisticsPolicy) -> Self {
+ self.metadata_options.set_encoding_stats_policy(policy);
+ self
+ }
+
/// Provide the file decryption properties to use when reading encrypted
parquet files.
///
/// If encryption is enabled and the file is encrypted, the
`file_decryption_properties` must be provided.
@@ -1420,7 +1444,7 @@ pub(crate) mod tests {
FloatType, Int32Type, Int64Type, Int96, Int96Type,
};
use crate::errors::Result;
- use crate::file::metadata::ParquetMetaData;
+ use crate::file::metadata::{ParquetMetaData, ParquetStatisticsPolicy};
use crate::file::properties::{EnabledStatistics, WriterProperties,
WriterVersion};
use crate::file::writer::SerializedFileWriter;
use crate::schema::parser::parse_message_type;
@@ -1474,6 +1498,69 @@ pub(crate) mod tests {
assert_eq!(expected.as_ref(), builder.metadata.as_ref());
}
+ #[test]
+ fn test_page_encoding_stats_mask() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{testdata}/alltypes_tiny_pages.parquet");
+ let file = File::open(path).unwrap();
+
+ let arrow_options =
ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
+ let builder =
+ ParquetRecordBatchReaderBuilder::try_new_with_options(file,
arrow_options).unwrap();
+
+ let row_group_metadata = builder.metadata.row_group(0);
+
+ // test page encoding stats
+ let page_encoding_stats = row_group_metadata
+ .column(0)
+ .page_encoding_stats_mask()
+ .unwrap();
+ assert!(page_encoding_stats.is_only(Encoding::PLAIN));
+ let page_encoding_stats = row_group_metadata
+ .column(2)
+ .page_encoding_stats_mask()
+ .unwrap();
+ assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
+ }
+
+ #[test]
+ fn test_page_encoding_stats_skipped() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{testdata}/alltypes_tiny_pages.parquet");
+ let file = File::open(path).unwrap();
+
+ // test skipping all
+ let arrow_options =
+
ArrowReaderOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
+ let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
+ file.try_clone().unwrap(),
+ arrow_options,
+ )
+ .unwrap();
+
+ let row_group_metadata = builder.metadata.row_group(0);
+ for column in row_group_metadata.columns() {
+ assert!(column.page_encoding_stats().is_none());
+ assert!(column.page_encoding_stats_mask().is_none());
+ }
+
+ // test skipping all but one column and converting to mask
+ let arrow_options = ArrowReaderOptions::new()
+ .with_encoding_stats_as_mask(true)
+
.with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
+ let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
+ file.try_clone().unwrap(),
+ arrow_options,
+ )
+ .unwrap();
+
+ let row_group_metadata = builder.metadata.row_group(0);
+ for (idx, column) in row_group_metadata.columns().iter().enumerate() {
+ assert!(column.page_encoding_stats().is_none());
+ assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
+ }
+ }
+
#[test]
fn test_arrow_reader_single_column() {
let file =
get_test_file("parquet/generated_simple_numerics/blogs.parquet");
diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index 9566454cb0..f06cfdb53f 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -737,6 +737,11 @@ impl EncodingMask {
self.0 & (1 << (val as i32)) != 0
}
+ /// Test if this mask has only the bit for the given [`Encoding`] set.
+ pub fn is_only(&self, val: Encoding) -> bool {
+ self.0 == (1 << (val as i32))
+ }
+
/// Test if all [`Encoding`]s in a given set are present in this mask.
pub fn all_set<'a>(&self, mut encodings: impl Iterator<Item = &'a
Encoding>) -> bool {
encodings.all(|&e| self.is_set(e))
@@ -2510,4 +2515,14 @@ mod tests {
"Parquet error: Attempt to create invalid mask: 0x2"
);
}
+
+ #[test]
+ fn test_encoding_mask_is_only() {
+ let mask = EncodingMask::new_from_encodings([Encoding::PLAIN].iter());
+ assert!(mask.is_only(Encoding::PLAIN));
+
+ let mask =
+ EncodingMask::new_from_encodings([Encoding::PLAIN,
Encoding::PLAIN_DICTIONARY].iter());
+ assert!(!mask.is_only(Encoding::PLAIN));
+ }
}
diff --git a/parquet/src/file/metadata/memory.rs
b/parquet/src/file/metadata/memory.rs
index 11536bbbd4..30c10e7f22 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -21,7 +21,8 @@
use crate::basic::{BoundaryOrder, ColumnOrder, Compression, Encoding,
PageType};
use crate::data_type::private::ParquetValueType;
use crate::file::metadata::{
- ColumnChunkMetaData, FileMetaData, KeyValue, PageEncodingStats,
RowGroupMetaData, SortingColumn,
+ ColumnChunkMetaData, FileMetaData, KeyValue, PageEncodingStats,
ParquetPageEncodingStats,
+ RowGroupMetaData, SortingColumn,
};
use crate::file::page_index::column_index::{
ByteArrayColumnIndex, ColumnIndex, ColumnIndexMetaData,
PrimitiveColumnIndex,
@@ -185,6 +186,15 @@ impl HeapSize for Encoding {
}
}
+impl HeapSize for ParquetPageEncodingStats {
+ fn heap_size(&self) -> usize {
+ match self {
+ Self::Full(v) => v.heap_size(),
+ Self::Mask(_) => 0,
+ }
+ }
+}
+
impl HeapSize for PageEncodingStats {
fn heap_size(&self) -> usize {
self.page_type.heap_size() + self.encoding.heap_size()
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 45b69a6679..6bd426ee67 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -128,7 +128,7 @@ use crate::{
};
pub use footer_tail::FooterTail;
-pub use options::ParquetMetaDataOptions;
+pub use options::{ParquetMetaDataOptions, ParquetStatisticsPolicy};
pub use push_decoder::ParquetMetaDataPushDecoder;
pub use reader::{PageIndexPolicy, ParquetMetaDataReader};
use std::io::Write;
@@ -470,6 +470,16 @@ pub struct PageEncodingStats {
}
);
+/// Internal representation of the page encoding stats in the
[`ColumnChunkMetaData`].
+/// This is not publicly exposed, with different getters defined for each
variant.
+#[derive(Debug, Clone, PartialEq)]
+enum ParquetPageEncodingStats {
+ /// The full array of stats as defined in the Parquet spec.
+ Full(Vec<PageEncodingStats>),
+ /// A condensed version of only page encodings seen.
+ Mask(EncodingMask),
+}
+
/// Reference counted pointer for [`FileMetaData`].
pub type FileMetaDataPtr = Arc<FileMetaData>;
@@ -812,7 +822,7 @@ pub struct ColumnChunkMetaData {
dictionary_page_offset: Option<i64>,
statistics: Option<Statistics>,
geo_statistics: Option<Box<geo_statistics::GeospatialStatistics>>,
- encoding_stats: Option<Vec<PageEncodingStats>>,
+ encoding_stats: Option<ParquetPageEncodingStats>,
bloom_filter_offset: Option<i64>,
bloom_filter_length: Option<i32>,
offset_index_offset: Option<i64>,
@@ -1050,10 +1060,47 @@ impl ColumnChunkMetaData {
self.geo_statistics.as_deref()
}
- /// Returns the offset for the page encoding stats,
- /// or `None` if no page encoding stats are available.
+ /// Returns the page encoding statistics, or `None` if no page encoding
statistics
+ /// are available (or they were converted to a mask).
pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
- self.encoding_stats.as_ref()
+ match self.encoding_stats.as_ref() {
+ Some(ParquetPageEncodingStats::Full(stats)) => Some(stats),
+ _ => None,
+ }
+ }
+
+ /// Returns the page encoding statistics reduced to a bitmask, or `None`
if statistics are
+ /// not available (or they were left in their original form).
+ ///
+ /// The [`PageEncodingStats`] struct was added to the Parquet
specification specifically to
+ /// enable fast determination of whether all pages in a column chunk are
dictionary encoded
+ /// (see <https://github.com/apache/parquet-format/pull/16>).
+ /// Decoding the full page encoding statistics, however, can be very
costly, and is not
+ /// necessary to support the aforementioned use case. As an alternative,
this crate can
+ /// instead distill the list of `PageEncodingStats` down to a bitmask of
just the encodings
+ /// used for data pages
+ /// (see [`ParquetMetaDataOptions::set_encoding_stats_as_mask`]).
+ /// To test for an all-dictionary-encoded chunk one could use this bitmask
in the following way:
+ ///
+ /// ```rust
+ /// use parquet::basic::Encoding;
+ /// use parquet::file::metadata::ColumnChunkMetaData;
+ /// // test if all data pages in the column chunk are dictionary encoded
+ /// fn is_all_dictionary_encoded(col_meta: &ColumnChunkMetaData) -> bool {
+ /// // check that dictionary encoding was used
+ /// col_meta.dictionary_page_offset().is_some()
+ /// && col_meta.page_encoding_stats_mask().is_some_and(|mask| {
+ /// // mask should only have one bit set, either for
PLAIN_DICTIONARY or
+ /// // RLE_DICTIONARY
+ /// mask.is_only(Encoding::PLAIN_DICTIONARY) ||
mask.is_only(Encoding::RLE_DICTIONARY)
+ /// })
+ /// }
+ /// ```
+ pub fn page_encoding_stats_mask(&self) -> Option<&EncodingMask> {
+ match self.encoding_stats.as_ref() {
+ Some(ParquetPageEncodingStats::Mask(stats)) => Some(stats),
+ _ => None,
+ }
}
/// Returns the offset for the bloom filter.
@@ -1273,8 +1320,18 @@ impl ColumnChunkMetaDataBuilder {
}
/// Sets page encoding stats for this column chunk.
+ ///
+ /// This will overwrite any existing stats, either `Vec` based or bitmask.
pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) ->
Self {
- self.0.encoding_stats = Some(value);
+ self.0.encoding_stats = Some(ParquetPageEncodingStats::Full(value));
+ self
+ }
+
+ /// Sets page encoding stats mask for this column chunk.
+ ///
+ /// This will overwrite any existing stats, either `Vec` based or bitmask.
+ pub fn set_page_encoding_stats_mask(mut self, value: EncodingMask) -> Self
{
+ self.0.encoding_stats = Some(ParquetPageEncodingStats::Mask(value));
self
}
diff --git a/parquet/src/file/metadata/options.rs
b/parquet/src/file/metadata/options.rs
index bbc5314d3a..c1ee22ff8d 100644
--- a/parquet/src/file/metadata/options.rs
+++ b/parquet/src/file/metadata/options.rs
@@ -17,8 +17,69 @@
//! Options used to control metadata parsing
+use std::collections::HashSet;
+use std::sync::Arc;
+
use crate::schema::types::SchemaDescPtr;
+/// Enum to control decoding of some Parquet statistics fields.
+///
+/// # Example
+/// ```rust
+/// use parquet::file::metadata::ParquetStatisticsPolicy;
+/// use parquet::file::serialized_reader::ReadOptionsBuilder;
+/// use parquet::arrow::arrow_reader::ArrowReaderOptions;
+///
+/// // Set arrow options to skip encoding statistics for all columns.
+/// let options =
+///
ArrowReaderOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
+///
+/// // Set serialized reader options to decode encoding statistics for all
columns.
+/// let options =
+///
ReadOptionsBuilder::new().with_encoding_stats_policy(ParquetStatisticsPolicy::KeepAll)
+/// .build();
+///
+/// // Set arrow options to skip encoding statistics for all columns, but to
decode statistics
+/// // for columns 0 and 1.
+/// let options = ArrowReaderOptions::new()
+/// .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0,
1]));
+/// ```
+#[derive(Default, Debug, Clone)]
+pub enum ParquetStatisticsPolicy {
+ /// Decode the relevant statistics for all columns.
+ #[default]
+ KeepAll,
+ /// Skip decoding the relevant statistics for all columns.
+ SkipAll,
+ /// Skip decoding the relevant statistics for all columns not in the
provided set
+ /// of column indices.
+ SkipExcept(Arc<HashSet<usize>>),
+}
+
+impl ParquetStatisticsPolicy {
+ /// Create a `ParquetStatisticsPolicy` to skip all columns except those in
`keep`.
+ ///
+ /// If `keep` is empty, then this returns [`Self::SkipAll`]
+ pub fn skip_except(keep: &[usize]) -> Self {
+ if keep.is_empty() {
+ Self::SkipAll
+ } else {
+ let mut keep_set = HashSet::<usize>::with_capacity(keep.len());
+ keep_set.extend(keep.iter());
+ Self::SkipExcept(Arc::new(keep_set))
+ }
+ }
+
+ /// Returns whether the policy for the given column index is to skip the
statistics.
+ pub(crate) fn is_skip(&self, col_index: usize) -> bool {
+ match self {
+ Self::KeepAll => false,
+ Self::SkipAll => true,
+ Self::SkipExcept(keep) => !keep.contains(&col_index),
+ }
+ }
+}
+
/// Options that can be set to control what parts of the Parquet file footer
/// metadata will be decoded and made present in the [`ParquetMetaData`]
returned
/// by [`ParquetMetaDataReader`] and [`ParquetMetaDataPushDecoder`].
@@ -29,6 +90,8 @@ use crate::schema::types::SchemaDescPtr;
#[derive(Default, Debug, Clone)]
pub struct ParquetMetaDataOptions {
schema_descr: Option<SchemaDescPtr>,
+ encoding_stats_as_mask: bool,
+ encoding_stats_policy: ParquetStatisticsPolicy,
}
impl ParquetMetaDataOptions {
@@ -48,9 +111,73 @@ impl ParquetMetaDataOptions {
self.schema_descr = Some(val);
}
- /// Provide a schema to use when decoding the metadata. Returns `Self` for
chaining.
+ /// Call [`Self::set_schema`] and return `Self` for chaining.
pub fn with_schema(mut self, val: SchemaDescPtr) -> Self {
- self.schema_descr = Some(val);
+ self.set_schema(val);
+ self
+ }
+
+ /// Returns whether to present the [`encoding_stats`] field of the Parquet
`ColumnMetaData`
+ /// as a bitmask (defaults to `false`).
+ ///
+ /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an
explanation of why this
+ /// might be desirable.
+ ///
+ /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
+ /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
+ /// [`encoding_stats`]:
+ ///
https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
+ pub fn encoding_stats_as_mask(&self) -> bool {
+ self.encoding_stats_as_mask
+ }
+
+ /// Convert [`encoding_stats`] from a vector of [`PageEncodingStats`] to a
bitmask. This can
+ /// speed up metadata decoding while still enabling some use cases served
by the full stats.
+ ///
+ /// Note that if for a given column both this option and
`skip_encoding_stats` are `true`, the
+ /// stats will be skipped and not be returned as a mask.
+ ///
+ /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for more
information.
+ ///
+ /// [`PageEncodingStats`]: crate::file::metadata::PageEncodingStats
+ /// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
+ /// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
+ /// [`encoding_stats`]:
+ ///
https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
+ pub fn set_encoding_stats_as_mask(&mut self, val: bool) {
+ self.encoding_stats_as_mask = val;
+ }
+
+ /// Call [`Self::set_encoding_stats_as_mask`] and return `Self` for
chaining.
+ pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
+ self.set_encoding_stats_as_mask(val);
+ self
+ }
+
+ /// Returns whether to skip decoding the [`encoding_stats`] in the Parquet
`ColumnMetaData`
+ /// for the column indexed by `col_index`.
+ ///
+ /// [`encoding_stats`]:
+ ///
https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
+ pub fn skip_encoding_stats(&self, col_index: usize) -> bool {
+ self.encoding_stats_policy.is_skip(col_index)
+ }
+
+ /// Sets the decoding policy for [`encoding_stats`] in the Parquet
`ColumnMetaData`.
+ ///
+ /// The default policy is to decode all `encoding_stats`.
+ ///
+ /// This option takes precedence over [`Self::encoding_stats_as_mask`].
+ ///
+ /// [`encoding_stats`]:
+ ///
https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
+ pub fn set_encoding_stats_policy(&mut self, policy:
ParquetStatisticsPolicy) {
+ self.encoding_stats_policy = policy;
+ }
+
+ /// Call [`Self::set_encoding_stats_policy`] and return `Self` for
chaining.
+ pub fn with_encoding_stats_policy(mut self, policy:
ParquetStatisticsPolicy) -> Self {
+ self.set_encoding_stats_policy(policy);
self
}
}
diff --git a/parquet/src/file/metadata/thrift/encryption.rs
b/parquet/src/file/metadata/thrift/encryption.rs
index 56c5a6a4b9..9713cf936d 100644
--- a/parquet/src/file/metadata/thrift/encryption.rs
+++ b/parquet/src/file/metadata/thrift/encryption.rs
@@ -113,6 +113,7 @@ pub(crate) struct FileCryptoMetaData<'a> {
fn row_group_from_encrypted_thrift(
mut rg: RowGroupMetaData,
decryptor: Option<&FileDecryptor>,
+ options: Option<&ParquetMetaDataOptions>,
) -> Result<RowGroupMetaData> {
let schema_descr = rg.schema_descr;
@@ -176,7 +177,7 @@ fn row_group_from_encrypted_thrift(
// parse decrypted buffer and then replace fields in 'c'
let mut prot = ThriftSliceInputProtocol::new(&decrypted_cc_buf);
- let mask = read_column_metadata(&mut prot, &mut c)?;
+ let mask = read_column_metadata(&mut prot, &mut c, i, options)?;
validate_column_metadata(mask)?;
columns.push(c);
@@ -297,7 +298,7 @@ pub(crate) fn parquet_metadata_with_encryption(
// decrypt column chunk info
let row_groups = row_groups
.into_iter()
- .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref()))
+ .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref(),
options))
.collect::<Result<Vec<_>>>()?;
let metadata = ParquetMetaDataBuilder::new(file_metadata)
diff --git a/parquet/src/file/metadata/thrift/mod.rs
b/parquet/src/file/metadata/thrift/mod.rs
index 225c4d29d2..95ad67da6d 100644
--- a/parquet/src/file/metadata/thrift/mod.rs
+++ b/parquet/src/file/metadata/thrift/mod.rs
@@ -43,8 +43,8 @@ use crate::{
file::{
metadata::{
ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue,
LevelHistogram,
- PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions,
RowGroupMetaData,
- RowGroupMetaDataBuilder, SortingColumn,
+ PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions,
ParquetPageEncodingStats,
+ RowGroupMetaData, RowGroupMetaDataBuilder, SortingColumn,
},
statistics::ValueStatistics,
},
@@ -382,15 +382,41 @@ fn validate_column_metadata(mask: u16) -> Result<()> {
Ok(())
}
+fn read_encoding_stats_as_mask<'a>(
+ prot: &mut ThriftSliceInputProtocol<'a>,
+) -> Result<EncodingMask> {
+ // read the vector of stats, setting mask bits for data pages
+ let mut mask = 0i32;
+ let list_ident = prot.read_list_begin()?;
+ for _ in 0..list_ident.size {
+ let pes = PageEncodingStats::read_thrift(prot)?;
+ match pes.page_type {
+ PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => mask |= 1 <<
pes.encoding as i32,
+ _ => {}
+ }
+ }
+ EncodingMask::try_new(mask)
+}
+
// Decode `ColumnMetaData`. Returns a mask of all required fields that were
observed.
// This mask can be passed to `validate_column_metadata`.
fn read_column_metadata<'a>(
prot: &mut ThriftSliceInputProtocol<'a>,
column: &mut ColumnChunkMetaData,
+ col_index: usize,
+ options: Option<&ParquetMetaDataOptions>,
) -> Result<u16> {
// mask for seen required fields in ColumnMetaData
let mut seen_mask = 0u16;
+ let mut skip_pes = false;
+ let mut pes_mask = false;
+
+ if let Some(opts) = options {
+ skip_pes = opts.skip_encoding_stats(col_index);
+ pes_mask = opts.encoding_stats_as_mask();
+ }
+
// struct ColumnMetaData {
// 1: required Type type
// 2: required list<Encoding> encodings
@@ -461,10 +487,15 @@ fn read_column_metadata<'a>(
column.statistics =
convert_stats(column_descr,
Some(Statistics::read_thrift(&mut *prot)?))?;
}
- 13 => {
- let val =
- read_thrift_vec::<PageEncodingStats,
ThriftSliceInputProtocol>(&mut *prot)?;
- column.encoding_stats = Some(val);
+ 13 if !skip_pes => {
+ if pes_mask {
+ let val = read_encoding_stats_as_mask(&mut *prot)?;
+ column.encoding_stats =
Some(ParquetPageEncodingStats::Mask(val));
+ } else {
+ let val =
+ read_thrift_vec::<PageEncodingStats,
ThriftSliceInputProtocol>(&mut *prot)?;
+ column.encoding_stats =
Some(ParquetPageEncodingStats::Full(val));
+ }
}
14 => {
column.bloom_filter_offset = Some(i64::read_thrift(&mut
*prot)?);
@@ -499,6 +530,8 @@ fn read_column_metadata<'a>(
fn read_column_chunk<'a>(
prot: &mut ThriftSliceInputProtocol<'a>,
column_descr: &Arc<ColumnDescriptor>,
+ col_index: usize,
+ options: Option<&ParquetMetaDataOptions>,
) -> Result<ColumnChunkMetaData> {
// create a default initialized ColumnMetaData
let mut col =
ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
@@ -535,7 +568,7 @@ fn read_column_chunk<'a>(
has_file_offset = true;
}
3 => {
- col_meta_mask = read_column_metadata(&mut *prot, &mut col)?;
+ col_meta_mask = read_column_metadata(&mut *prot, &mut col,
col_index, options)?;
}
4 => {
col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
@@ -585,6 +618,7 @@ fn read_column_chunk<'a>(
fn read_row_group(
prot: &mut ThriftSliceInputProtocol,
schema_descr: &Arc<SchemaDescriptor>,
+ options: Option<&ParquetMetaDataOptions>,
) -> Result<RowGroupMetaData> {
// create default initialized RowGroupMetaData
let mut row_group =
RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
@@ -623,7 +657,7 @@ fn read_row_group(
));
}
for i in 0..list_ident.size as usize {
- let col = read_column_chunk(prot,
&schema_descr.columns()[i])?;
+ let col = read_column_chunk(prot,
&schema_descr.columns()[i], i, options)?;
row_group.columns.push(col);
}
mask |= RG_COLUMNS;
@@ -774,7 +808,7 @@ pub(crate) fn parquet_metadata_from_bytes(
"Row group ordinal {ordinal} exceeds i16 max
value",
))
})?;
- let rg = read_row_group(&mut prot, schema_descr)?;
+ let rg = read_row_group(&mut prot, schema_descr, options)?;
rg_vec.push(assigner.ensure(ordinal, rg)?);
}
row_groups = Some(rg_vec);
@@ -1686,7 +1720,7 @@ pub(crate) mod tests {
schema_descr: Arc<SchemaDescriptor>,
) -> Result<RowGroupMetaData> {
let mut reader = ThriftSliceInputProtocol::new(buf);
- crate::file::metadata::thrift::read_row_group(&mut reader,
&schema_descr)
+ crate::file::metadata::thrift::read_row_group(&mut reader,
&schema_descr, None)
}
pub(crate) fn read_column_chunk(
@@ -1694,7 +1728,7 @@ pub(crate) mod tests {
column_descr: Arc<ColumnDescriptor>,
) -> Result<ColumnChunkMetaData> {
let mut reader = ThriftSliceInputProtocol::new(buf);
- crate::file::metadata::thrift::read_column_chunk(&mut reader,
&column_descr)
+ crate::file::metadata::thrift::read_column_chunk(&mut reader,
&column_descr, 0, None)
}
pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index ce608c7717..8ef7b972d7 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -160,6 +160,28 @@ impl ReadOptionsBuilder {
self
}
+ /// Set whether to convert the [`encoding_stats`] in the Parquet
`ColumnMetaData` to a bitmask
+ /// (defaults to `false`).
+ ///
+ /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an
explanation of why this
+ /// might be desirable.
+ ///
+ /// [`encoding_stats`]:
+ ///
https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
+ pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
+ self.metadata_options.set_encoding_stats_as_mask(val);
+ self
+ }
+
+ /// Sets the decoding policy for [`encoding_stats`] in the Parquet
`ColumnMetaData`.
+ ///
+ /// [`encoding_stats`]:
+ ///
https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
+ pub fn with_encoding_stats_policy(mut self, policy:
ParquetStatisticsPolicy) -> Self {
+ self.metadata_options.set_encoding_stats_policy(policy);
+ self
+ }
+
/// Seal the builder and return the read options
pub fn build(self) -> ReadOptions {
let props = self
@@ -1857,6 +1879,63 @@ mod tests {
assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
}
+ #[test]
+ fn test_file_reader_page_stats_mask() {
+ let file = get_test_file("alltypes_tiny_pages.parquet");
+ let options = ReadOptionsBuilder::new()
+ .with_encoding_stats_as_mask(true)
+ .build();
+ let file_reader =
Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
+
+ let row_group_metadata = file_reader.metadata.row_group(0);
+
+ // test page encoding stats
+ let page_encoding_stats = row_group_metadata
+ .column(0)
+ .page_encoding_stats_mask()
+ .unwrap();
+ assert!(page_encoding_stats.is_only(Encoding::PLAIN));
+ let page_encoding_stats = row_group_metadata
+ .column(2)
+ .page_encoding_stats_mask()
+ .unwrap();
+ assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
+ }
+
+ #[test]
+ fn test_file_reader_page_stats_skipped() {
+ let file = get_test_file("alltypes_tiny_pages.parquet");
+
+ // test skipping all
+ let options = ReadOptionsBuilder::new()
+ .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
+ .build();
+ let file_reader = Arc::new(
+ SerializedFileReader::new_with_options(file.try_clone().unwrap(),
options).unwrap(),
+ );
+
+ let row_group_metadata = file_reader.metadata.row_group(0);
+ for column in row_group_metadata.columns() {
+ assert!(column.page_encoding_stats().is_none());
+ assert!(column.page_encoding_stats_mask().is_none());
+ }
+
+ // test skipping all but one column
+ let options = ReadOptionsBuilder::new()
+ .with_encoding_stats_as_mask(true)
+
.with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
+ .build();
+ let file_reader = Arc::new(
+ SerializedFileReader::new_with_options(file.try_clone().unwrap(),
options).unwrap(),
+ );
+
+ let row_group_metadata = file_reader.metadata.row_group(0);
+ for (idx, column) in row_group_metadata.columns().iter().enumerate() {
+ assert!(column.page_encoding_stats().is_none());
+ assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
+ }
+ }
+
#[test]
fn test_file_reader_with_no_filter() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");