This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch gh5854_thrift_remodel
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this
push:
new db16cb4d84 [thrift-remodel] Add custom `PageLocation` decoder to speed
up decoding of page indexes (#8190)
db16cb4d84 is described below
commit db16cb4d840a9a28324662b3e1a800e097e2db1b
Author: Ed Seidl <[email protected]>
AuthorDate: Wed Aug 27 12:44:18 2025 -0700
[thrift-remodel] Add custom `PageLocation` decoder to speed up decoding of
page indexes (#8190)
# Which issue does this PR close?
**Note: this targets a feature branch, not main**
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
- Part of #5854.
# Rationale for this change
Add a custom parser for `PageLocation` as the decoding of this struct is
one of several hot spots.
# What changes are included in this PR?
This adds a faster means of obtaining the struct field ids to
`ThriftCompactInputProtocol`. For a small struct (3 fields) with all of
them required, we can save a good bit of time bypassing
`ThriftCompactInputProtocol::read_field_begin` which is very general and
can handle out-of-order fields, among other things. By adding a new
function `read_field_header`, we can avoid the costly branching that
occurs when calculating the new field id (as well as special handling
needed for boolean fields). Field validation is then handled on the
consuming side while decoding the `PageLocation` struct.
Note that to obtain the speed up seen, we need to assume the fields will
always be in order, and the field ids will all be encoded as field
deltas. This is probably a fairly safe assumption, but there does exist
the possibility of custom thrift writers that use absolute field ids. If
we encounter such a writer in the wild, this change will need to be
reverted.
# Are these changes tested?
These changes should be covered by existing changes.
# Are there any user-facing changes?
None beyond the changes in this branch.
---
parquet/src/file/page_index/index_reader.rs | 11 +++-
parquet/src/file/page_index/offset_index.rs | 88 +++++++++++++++++++++++++++++
parquet/src/parquet_thrift.rs | 13 +++++
parquet/tests/arrow_reader/io/mod.rs | 5 ++
4 files changed, 116 insertions(+), 1 deletion(-)
diff --git a/parquet/src/file/page_index/index_reader.rs
b/parquet/src/file/page_index/index_reader.rs
index f35241689e..99e5963b29 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -133,7 +133,16 @@ pub fn read_offset_indexes<R: ChunkReader>(
pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData,
ParquetError> {
let mut prot = ThriftCompactInputProtocol::new(data);
- OffsetIndexMetaData::try_from(&mut prot)
+
+ // Try to read fast-path first. If that fails, fall back to slower but
more robust
+ // decoder.
+ match OffsetIndexMetaData::try_from_fast(&mut prot) {
+ Ok(offset_index) => Ok(offset_index),
+ Err(_) => {
+ prot = ThriftCompactInputProtocol::new(data);
+ OffsetIndexMetaData::try_from(&mut prot)
+ }
+ }
}
// private struct only used for decoding then discarded
diff --git a/parquet/src/file/page_index/offset_index.rs
b/parquet/src/file/page_index/offset_index.rs
index d4c196a3ae..6cb7539cb5 100644
--- a/parquet/src/file/page_index/offset_index.rs
+++ b/parquet/src/file/page_index/offset_index.rs
@@ -104,4 +104,92 @@ impl OffsetIndexMetaData {
self.unencoded_byte_array_data_bytes.clone(),
)
}
+
+ // Fast-path read of offset index. This works because we expect all field
deltas to be 1,
+ // and there's no nesting beyond PageLocation, so no need to save the last
field id. Like
+ // read_page_locations(), this will fail if absolute field id's are used.
+ pub(super) fn try_from_fast<'a>(prot: &mut ThriftCompactInputProtocol<'a>)
-> Result<Self> {
+ // Offset index is a struct with 2 fields. First field is an array of
PageLocations,
+ // the second an optional array of i64.
+
+ // read field 1 header, then list header, then vec of PageLocations
+ let (field_type, delta) = prot.read_field_header()?;
+ if delta != 1 || field_type != FieldType::List as u8 {
+ return Err(general_err!("error reading
OffsetIndex::page_locations"));
+ }
+
+ // we have to do this manually because we want to use the fast
PageLocation decoder
+ let list_ident = prot.read_list_begin()?;
+ let mut page_locations = Vec::with_capacity(list_ident.size as usize);
+ for _ in 0..list_ident.size {
+ page_locations.push(read_page_location(prot)?);
+ }
+
+ let mut unencoded_byte_array_data_bytes: Option<Vec<i64>> = None;
+
+ // read second field...if it's Stop we're done
+ let (mut field_type, delta) = prot.read_field_header()?;
+ if field_type == FieldType::List as u8 {
+ if delta != 1 {
+ return Err(general_err!(
+ "encountered unknown field while reading OffsetIndex"
+ ));
+ }
+ let vec = Vec::<i64>::try_from(&mut *prot)?;
+ unencoded_byte_array_data_bytes = Some(vec);
+
+ // this one should be Stop
+ (field_type, _) = prot.read_field_header()?;
+ }
+
+ if field_type != FieldType::Stop as u8 {
+ return Err(general_err!(
+ "encountered unknown field while reading OffsetIndex"
+ ));
+ }
+
+ Ok(Self {
+ page_locations,
+ unencoded_byte_array_data_bytes,
+ })
+ }
+}
+
+// hand coding this one because it is very time critical
+
+// Note: this will fail if the fields are either out of order, or if a
suboptimal
+// encoder doesn't use field deltas.
+fn read_page_location<'a>(prot: &mut ThriftCompactInputProtocol<'a>) ->
Result<PageLocation> {
+ // there are 3 fields, all mandatory, so all field deltas should be 1
+ let (field_type, delta) = prot.read_field_header()?;
+ if delta != 1 || field_type != FieldType::I64 as u8 {
+ return Err(general_err!("error reading PageLocation::offset"));
+ }
+ let offset = prot.read_i64()?;
+
+ let (field_type, delta) = prot.read_field_header()?;
+ if delta != 1 || field_type != FieldType::I32 as u8 {
+ return Err(general_err!(
+ "error reading PageLocation::compressed_page_size"
+ ));
+ }
+ let compressed_page_size = prot.read_i32()?;
+
+ let (field_type, delta) = prot.read_field_header()?;
+ if delta != 1 || field_type != FieldType::I64 as u8 {
+ return Err(general_err!("error reading
PageLocation::first_row_index"));
+ }
+ let first_row_index = prot.read_i64()?;
+
+ // read end of struct...return error if there are unknown fields present
+ let (field_type, _) = prot.read_field_header()?;
+ if field_type != FieldType::Stop as u8 {
+ return Err(general_err!("unexpected field in PageLocation"));
+ }
+
+ Ok(PageLocation {
+ offset,
+ compressed_page_size,
+ first_row_index,
+ })
}
diff --git a/parquet/src/parquet_thrift.rs b/parquet/src/parquet_thrift.rs
index 7f5fe47521..2dff498372 100644
--- a/parquet/src/parquet_thrift.rs
+++ b/parquet/src/parquet_thrift.rs
@@ -244,6 +244,19 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> {
Ok(())
}
+ // This is a specialized version of read_field_begin, solely for use in
parsing
+ // PageLocation structs in the offset index. This function assumes that
the delta
+ // field will always be less than 0xf, fields will be in order, and no
boolean fields
+ // will be read. This also skips validation of the field type.
+ //
+ // Returns a tuple of (field_type, field_delta)
+ pub(crate) fn read_field_header(&mut self) -> Result<(u8, u8)> {
+ let field_type = self.read_byte()?;
+ let field_delta = (field_type & 0xf0) >> 4;
+ let field_type = field_type & 0xf;
+ Ok((field_type, field_delta))
+ }
+
pub(crate) fn read_field_begin(&mut self) -> Result<FieldIdentifier> {
// we can read at least one byte, which is:
// - the type
diff --git a/parquet/tests/arrow_reader/io/mod.rs
b/parquet/tests/arrow_reader/io/mod.rs
index 9cafcd714e..bfdb9467e2 100644
--- a/parquet/tests/arrow_reader/io/mod.rs
+++ b/parquet/tests/arrow_reader/io/mod.rs
@@ -298,6 +298,11 @@ impl TestRowGroups {
let start_offset = start_offset as usize;
let end_offset = start_offset + length as usize;
+ let page_locations = page_locations
+ .iter()
+ .map(parquet::format::PageLocation::from)
+ .collect();
+
TestColumnChunk {
name: column_name.clone(),
location: start_offset..end_offset,