This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 578030c895 parquet: rle skip decode loop when batch contains all max
levels (aka no nulls) (#9258)
578030c895 is described below
commit 578030c895486a300c594b25a7f2bc64841aacd0
Author: Lanqing Yang <[email protected]>
AuthorDate: Thu Feb 5 14:23:07 2026 -0800
parquet: rle skip decode loop when batch contains all max levels (aka no
nulls) (#9258)
# Which issue does this PR close?
- Closes #NNN.
# Rationale for this change
parquet reading perf - if rle value is true and rle left have enough
room for the current batch. lets skip the decode loop the overhead of
count_set_bits for null bitmap.
# What changes are included in this PR?
# Are these changes tested?
# Are there any user-facing changes?
---
.../src/arrow/record_reader/definition_levels.rs | 130 ++++++++++++++++++++-
parquet/src/arrow/record_reader/mod.rs | 2 +-
2 files changed, 126 insertions(+), 6 deletions(-)
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs
b/parquet/src/arrow/record_reader/definition_levels.rs
index f51dee5c5c..9e7345e2d7 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -90,13 +90,23 @@ impl DefinitionLevelBuffer {
}
}
- /// Returns the built null bitmask
- pub fn consume_bitmask(&mut self) -> Buffer {
+ /// Returns the built null bitmask, or None if all values are valid
+ pub fn consume_bitmask(&mut self) -> Option<Buffer> {
self.len = 0;
- match &mut self.inner {
- BufferInner::Full { nulls, .. } => nulls.finish().into_inner(),
- BufferInner::Mask { nulls } => nulls.finish().into_inner(),
+ let nulls = match &mut self.inner {
+ BufferInner::Full { nulls, .. } => nulls,
+ BufferInner::Mask { nulls } => nulls,
+ };
+
+ // Always call finish() to reset the builder state for the next batch.
+ let buffer = nulls.finish().into_inner();
+
+ // If no bitmap was constructed, return None
+ if buffer.is_empty() {
+ return None;
}
+
+ Some(buffer)
}
pub fn nulls(&self) -> &BooleanBufferBuilder {
@@ -171,6 +181,15 @@ impl DefinitionLevelDecoder for
DefinitionLevelBufferDecoder {
(BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
assert_eq!(self.max_level, 1);
+ // Fast path: if all requested levels are valid (max
definition level),
+ // we can skip RLE decoding and just append all-ones to the
bitmap.
+ // This is faster than decoding RLE data.
+ if let Some(count) =
decoder.try_consume_all_valid(num_levels)? {
+ nulls.append_n(count, true);
+ return Ok((count, count)); // values_read == levels_read
when all valid
+ }
+
+ // Normal path: decode RLE data into the bitmap
let start = nulls.len();
let levels_read = decoder.read(nulls, num_levels)?;
@@ -286,6 +305,37 @@ impl PackedDecoder {
self.data_offset = 0;
}
+ /// Try to consume `len` levels if all are valid (max definition level).
+ ///
+ /// Returns `Ok(Some(count))` if successfully consumed `count` all-valid
levels.
+ /// Returns `Ok(None)` if there are any nulls or packed data that prevents
fast path.
+ ///
+ /// Note: On `None`, the decoder state may have advanced to the next RLE
block,
+ /// but only if `rle_left` was zero (i.e., the block would have been loaded
+ /// on the next read anyway).
+ fn try_consume_all_valid(&mut self, len: usize) -> Result<Option<usize>> {
+ // If no active run and no packed data pending, try to parse the next
RLE block
+ if self.rle_left == 0 && self.packed_count == self.packed_offset {
+ if self.data_offset < self.data.len() {
+ self.next_rle_block()?;
+ } else {
+ // No more data available
+ return Ok(None);
+ }
+ }
+
+ // Fast path only works when we have an active RLE run of true values
+ // that covers the entire requested length.
+ if self.rle_left >= len && self.rle_value {
+ self.rle_left -= len;
+ return Ok(Some(len));
+ }
+
+ // Any other case (null run, packed data, or insufficient length)
+ // falls back to normal path
+ Ok(None)
+ }
+
fn read(&mut self, buffer: &mut BooleanBufferBuilder, len: usize) ->
Result<usize> {
let mut read = 0;
while read != len {
@@ -447,4 +497,74 @@ mod tests {
assert_eq!(read_level + skip_level, len);
assert_eq!(read_value + skip_value, total_value);
}
+
+ #[test]
+ fn test_try_consume_all_valid() {
+ // Test with all-valid data (all 1s) - single RLE run
+ let len = 100;
+ let mut encoder = RleEncoder::new(1, 1024);
+ for _ in 0..len {
+ encoder.put(1); // all valid
+ }
+ let encoded = encoder.consume();
+ let mut decoder = PackedDecoder::new();
+ decoder.set_data(Encoding::RLE, encoded.into());
+
+ // try_consume_all_valid now parses the RLE block itself, no need to
read first
+ let result = decoder.try_consume_all_valid(len).unwrap();
+ assert_eq!(result, Some(len));
+
+ // Test with all-null data (all 0s)
+ let mut encoder = RleEncoder::new(1, 1024);
+ for _ in 0..len {
+ encoder.put(0); // all null
+ }
+ let encoded = encoder.consume();
+ let mut decoder = PackedDecoder::new();
+ decoder.set_data(Encoding::RLE, encoded.into());
+
+ // Should return None because rle_value is false (all nulls)
+ let result = decoder.try_consume_all_valid(len).unwrap();
+ assert_eq!(result, None);
+
+ // Test when requesting more than available in current RLE run
+ let mut encoder = RleEncoder::new(1, 1024);
+ for _ in 0..10 {
+ encoder.put(1); // small run of valid
+ }
+ for _ in 0..10 {
+ encoder.put(0); // followed by nulls
+ }
+ let encoded = encoder.consume();
+ let mut decoder = PackedDecoder::new();
+ decoder.set_data(Encoding::RLE, encoded.into());
+
+ // Request more than the valid run - should return None
+ // (because we don't look ahead to next block)
+ let result = decoder.try_consume_all_valid(20).unwrap();
+ assert_eq!(result, None);
+
+ // Reset decoder and try requesting within the run
+ decoder.set_data(Encoding::RLE, {
+ let mut encoder = RleEncoder::new(1, 1024);
+ for _ in 0..10 {
+ encoder.put(1);
+ }
+ for _ in 0..10 {
+ encoder.put(0);
+ }
+ encoder.consume().into()
+ });
+
+ let result = decoder.try_consume_all_valid(5).unwrap();
+ assert_eq!(result, Some(5));
+
+ // After skipping 5, we should have 5 left in the valid RLE run
+ let result = decoder.try_consume_all_valid(5).unwrap();
+ assert_eq!(result, Some(5));
+
+ // Now the valid run is exhausted, next call should parse the null run
and return None
+ let result = decoder.try_consume_all_valid(5).unwrap();
+ assert_eq!(result, None);
+ }
}
diff --git a/parquet/src/arrow/record_reader/mod.rs
b/parquet/src/arrow/record_reader/mod.rs
index 758aea6ede..2092b4972d 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -193,7 +193,7 @@ where
let mask = self
.def_levels
.as_mut()
- .map(|levels| levels.consume_bitmask());
+ .and_then(|levels| levels.consume_bitmask());
// While we always consume the bitmask here, we only want to return
// the bitmask for nullable arrays. (Marking nulls on a non-nullable