This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new e42df82a6d [Variant] Simplify `Builder` buffer operations (#7795)
e42df82a6d is described below
commit e42df82a6d1e52f0dbdfa87130c2974ce460af69
Author: Matthew Kim <[email protected]>
AuthorDate: Fri Jun 27 07:07:41 2025 -0400
[Variant] Simplify `Builder` buffer operations (#7795)
# Rationale for this change
This PR simplifies how we build up the internal `VariantBuffer` by
appending to it directly, rather than pre-allocating a buffer filled
with zeroes and setting values at indices. This avoids indexing math
that can be hard to follow and reason about.
This PR also aims to design a well-defined API for `ValueBuffer`. My
thought here was we should not touch the inner `Vec<u8>`. It's quite
sensitive.
# Are there any user-facing changes?
Nope!
---
parquet-variant/src/builder.rs | 313 ++++++++++++++++++-----------------------
1 file changed, 139 insertions(+), 174 deletions(-)
diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs
index 74f8bf2a68..7f26b3279e 100644
--- a/parquet-variant/src/builder.rs
+++ b/parquet-variant/src/builder.rs
@@ -54,30 +54,54 @@ fn int_size(v: usize) -> u8 {
}
/// Write little-endian integer to buffer
-fn write_offset(buf: &mut [u8], value: usize, nbytes: u8) {
- for i in 0..nbytes {
- buf[i as usize] = (value >> (i * 8)) as u8;
- }
+fn write_offset(buf: &mut Vec<u8>, value: usize, nbytes: u8) {
+ let bytes = value.to_le_bytes();
+ buf.extend_from_slice(&bytes[..nbytes as usize]);
}
-/// Helper to make room for header by moving data
-fn make_room_for_header(buffer: &mut Vec<u8>, start_pos: usize, header_size:
usize) {
- let current_len = buffer.len();
- buffer.resize(current_len + header_size, 0);
-
- let src_start = start_pos;
- let src_end = current_len;
- let dst_start = start_pos + header_size;
+fn write_header(buf: &mut Vec<u8>, header_byte: u8, is_large: bool, num_items:
usize) {
+ buf.push(header_byte);
- buffer.copy_within(src_start..src_end, dst_start);
+ if is_large {
+ let num_items = num_items as u32;
+ buf.extend_from_slice(&num_items.to_le_bytes());
+ } else {
+ let num_items = num_items as u8;
+ buf.push(num_items);
+ };
}
-
#[derive(Default)]
struct ValueBuffer(Vec<u8>);
impl ValueBuffer {
+ fn append_u8(&mut self, term: u8) {
+ self.0.push(term);
+ }
+
+ fn append_slice(&mut self, other: &[u8]) {
+ self.0.extend_from_slice(other);
+ }
+
+ fn append_primitive_header(&mut self, primitive_type:
VariantPrimitiveType) {
+ self.0.push(primitive_header(primitive_type));
+ }
+
+ fn inner(&self) -> &[u8] {
+ &self.0
+ }
+
+ fn into_inner(self) -> Vec<u8> {
+ self.0
+ }
+
+ fn inner_mut(&mut self) -> &mut Vec<u8> {
+ &mut self.0
+ }
+
+ // Variant types below
+
fn append_null(&mut self) {
- self.0.push(primitive_header(VariantPrimitiveType::Null));
+ self.append_primitive_header(VariantPrimitiveType::Null);
}
fn append_bool(&mut self, value: bool) {
@@ -86,98 +110,91 @@ impl ValueBuffer {
} else {
VariantPrimitiveType::BooleanFalse
};
- self.0.push(primitive_header(primitive_type));
+ self.append_primitive_header(primitive_type);
}
fn append_int8(&mut self, value: i8) {
- self.0.push(primitive_header(VariantPrimitiveType::Int8));
- self.0.push(value as u8);
+ self.append_primitive_header(VariantPrimitiveType::Int8);
+ self.append_u8(value as u8);
}
fn append_int16(&mut self, value: i16) {
- self.0.push(primitive_header(VariantPrimitiveType::Int16));
- self.0.extend_from_slice(&value.to_le_bytes());
+ self.append_primitive_header(VariantPrimitiveType::Int16);
+ self.append_slice(&value.to_le_bytes());
}
fn append_int32(&mut self, value: i32) {
- self.0.push(primitive_header(VariantPrimitiveType::Int32));
- self.0.extend_from_slice(&value.to_le_bytes());
+ self.append_primitive_header(VariantPrimitiveType::Int32);
+ self.append_slice(&value.to_le_bytes());
}
fn append_int64(&mut self, value: i64) {
- self.0.push(primitive_header(VariantPrimitiveType::Int64));
- self.0.extend_from_slice(&value.to_le_bytes());
+ self.append_primitive_header(VariantPrimitiveType::Int64);
+ self.append_slice(&value.to_le_bytes());
}
fn append_float(&mut self, value: f32) {
- self.0.push(primitive_header(VariantPrimitiveType::Float));
- self.0.extend_from_slice(&value.to_le_bytes());
+ self.append_primitive_header(VariantPrimitiveType::Float);
+ self.append_slice(&value.to_le_bytes());
}
fn append_double(&mut self, value: f64) {
- self.0.push(primitive_header(VariantPrimitiveType::Double));
- self.0.extend_from_slice(&value.to_le_bytes());
+ self.append_primitive_header(VariantPrimitiveType::Double);
+ self.append_slice(&value.to_le_bytes());
}
fn append_date(&mut self, value: chrono::NaiveDate) {
- self.0.push(primitive_header(VariantPrimitiveType::Date));
+ self.append_primitive_header(VariantPrimitiveType::Date);
let days_since_epoch =
value.signed_duration_since(UNIX_EPOCH_DATE).num_days() as i32;
- self.0.extend_from_slice(&days_since_epoch.to_le_bytes());
+ self.append_slice(&days_since_epoch.to_le_bytes());
}
fn append_timestamp_micros(&mut self, value:
chrono::DateTime<chrono::Utc>) {
- self.0
- .push(primitive_header(VariantPrimitiveType::TimestampMicros));
+ self.append_primitive_header(VariantPrimitiveType::TimestampMicros);
let micros = value.timestamp_micros();
- self.0.extend_from_slice(µs.to_le_bytes());
+ self.append_slice(µs.to_le_bytes());
}
fn append_timestamp_ntz_micros(&mut self, value: chrono::NaiveDateTime) {
- self.0
- .push(primitive_header(VariantPrimitiveType::TimestampNtzMicros));
+ self.append_primitive_header(VariantPrimitiveType::TimestampNtzMicros);
let micros = value.and_utc().timestamp_micros();
- self.0.extend_from_slice(µs.to_le_bytes());
+ self.append_slice(µs.to_le_bytes());
}
fn append_decimal4(&mut self, integer: i32, scale: u8) {
- self.0
- .push(primitive_header(VariantPrimitiveType::Decimal4));
- self.0.push(scale);
- self.0.extend_from_slice(&integer.to_le_bytes());
+ self.append_primitive_header(VariantPrimitiveType::Decimal4);
+ self.append_u8(scale);
+ self.append_slice(&integer.to_le_bytes());
}
fn append_decimal8(&mut self, integer: i64, scale: u8) {
- self.0
- .push(primitive_header(VariantPrimitiveType::Decimal8));
- self.0.push(scale);
- self.0.extend_from_slice(&integer.to_le_bytes());
+ self.append_primitive_header(VariantPrimitiveType::Decimal8);
+ self.append_u8(scale);
+ self.append_slice(&integer.to_le_bytes());
}
fn append_decimal16(&mut self, integer: i128, scale: u8) {
- self.0
- .push(primitive_header(VariantPrimitiveType::Decimal16));
- self.0.push(scale);
- self.0.extend_from_slice(&integer.to_le_bytes());
+ self.append_primitive_header(VariantPrimitiveType::Decimal16);
+ self.append_u8(scale);
+ self.append_slice(&integer.to_le_bytes());
}
fn append_binary(&mut self, value: &[u8]) {
- self.0.push(primitive_header(VariantPrimitiveType::Binary));
- self.0
- .extend_from_slice(&(value.len() as u32).to_le_bytes());
- self.0.extend_from_slice(value);
+ self.append_primitive_header(VariantPrimitiveType::Binary);
+ self.append_slice(&(value.len() as u32).to_le_bytes());
+ self.append_slice(value);
}
fn append_short_string(&mut self, value: ShortString) {
let inner = value.0;
- self.0.push(short_string_header(inner.len()));
- self.0.extend_from_slice(inner.as_bytes());
+ self.append_u8(short_string_header(inner.len()));
+ self.append_slice(inner.as_bytes());
}
fn append_string(&mut self, value: &str) {
- self.0.push(primitive_header(VariantPrimitiveType::String));
- self.0
- .extend_from_slice(&(value.len() as u32).to_le_bytes());
- self.0.extend_from_slice(value.as_bytes());
+ self.append_primitive_header(VariantPrimitiveType::String);
+ self.append_slice(&(value.len() as u32).to_le_bytes());
+ self.append_slice(value.as_bytes());
}
fn offset(&self) -> usize {
@@ -227,8 +244,8 @@ struct MetadataBuilder {
}
impl MetadataBuilder {
- /// Add field name to dictionary, return its ID
- fn add_field_name(&mut self, field_name: &str) -> u32 {
+ /// Upsert field name to dictionary, return its ID
+ fn upsert_field_name(&mut self, field_name: &str) -> u32 {
use std::collections::btree_map::Entry;
match self.field_name_to_id.entry(field_name.to_string()) {
Entry::Occupied(entry) => *entry.get(),
@@ -248,6 +265,45 @@ impl MetadataBuilder {
fn metadata_size(&self) -> usize {
self.field_names.iter().map(|k| k.len()).sum()
}
+
+ fn finish(self) -> Vec<u8> {
+ let nkeys = self.num_field_names();
+
+ // Calculate metadata size
+ let total_dict_size: usize = self.metadata_size();
+
+ // Determine appropriate offset size based on the larger of dict size
or total string size
+ let max_offset = std::cmp::max(total_dict_size, nkeys);
+ let offset_size = int_size(max_offset);
+
+ let offset_start = 1 + offset_size as usize;
+ let string_start = offset_start + (nkeys + 1) * offset_size as usize;
+ let metadata_size = string_start + total_dict_size;
+
+ let mut metadata = Vec::with_capacity(metadata_size);
+
+ // Write header: version=1, not sorted, with calculated offset_size
+ metadata.push(0x01 | ((offset_size - 1) << 6));
+
+ // Write dictionary size
+ write_offset(&mut metadata, nkeys, offset_size);
+
+ // Write offsets
+ let mut cur_offset = 0;
+ for key in self.field_names.iter() {
+ write_offset(&mut metadata, cur_offset, offset_size);
+ cur_offset += key.len();
+ }
+ // Write final offset
+ write_offset(&mut metadata, cur_offset, offset_size);
+
+ // Write string data
+ for key in self.field_names.iter() {
+ metadata.extend_from_slice(key.as_bytes());
+ }
+
+ metadata
+ }
}
/// Top level builder for [`Variant`] values
@@ -388,6 +444,7 @@ impl MetadataBuilder {
/// );
///
/// ```
+#[derive(Default)]
pub struct VariantBuilder {
buffer: ValueBuffer,
metadata_builder: MetadataBuilder,
@@ -420,54 +477,7 @@ impl VariantBuilder {
}
pub fn finish(self) -> (Vec<u8>, Vec<u8>) {
- let nkeys = self.metadata_builder.num_field_names();
-
- // Calculate metadata size
- let total_dict_size: usize = self.metadata_builder.metadata_size();
-
- // Determine appropriate offset size based on the larger of dict size
or total string size
- let max_offset = std::cmp::max(total_dict_size, nkeys);
- let offset_size = int_size(max_offset);
-
- let offset_start = 1 + offset_size as usize;
- let string_start = offset_start + (nkeys + 1) * offset_size as usize;
- let metadata_size = string_start + total_dict_size;
-
- // Pre-allocate exact size to avoid reallocations
- let mut metadata = vec![0u8; metadata_size];
-
- // Write header: version=1, not sorted, with calculated offset_size
- metadata[0] = 0x01 | ((offset_size - 1) << 6);
-
- // Write dictionary size
- write_offset(&mut metadata[1..], nkeys, offset_size);
-
- // Write offsets and string data
- let mut cur_offset = 0;
- for (i, key) in self.metadata_builder.field_names.iter().enumerate() {
- write_offset(
- &mut metadata[offset_start + i * offset_size as usize..],
- cur_offset,
- offset_size,
- );
- let start = string_start + cur_offset;
- metadata[start..start + key.len()].copy_from_slice(key.as_bytes());
- cur_offset += key.len();
- }
- // Write final offset
- write_offset(
- &mut metadata[offset_start + nkeys * offset_size as usize..],
- cur_offset,
- offset_size,
- );
-
- (metadata, self.buffer.0)
- }
-}
-
-impl Default for VariantBuilder {
- fn default() -> Self {
- Self::new()
+ (self.metadata_builder.finish(), self.buffer.into_inner())
}
}
@@ -537,40 +547,23 @@ impl<'a> ListBuilder<'a> {
let data_size = self.buffer.offset();
let num_elements = self.offsets.len() - 1;
let is_large = num_elements > u8::MAX as usize;
- let size_bytes = if is_large { 4 } else { 1 };
let offset_size = int_size(data_size);
- let header_size = 1 + size_bytes + (num_elements + 1) * offset_size as
usize;
-
- let parent_start_pos = self.parent_buffer.offset();
-
- make_room_for_header(&mut self.parent_buffer.0, parent_start_pos,
header_size);
// Write header
- let mut pos = parent_start_pos;
- self.parent_buffer.0[pos] = array_header(is_large, offset_size);
- pos += 1;
-
- if is_large {
- self.parent_buffer.0[pos..pos + 4]
- .copy_from_slice(&(num_elements as u32).to_le_bytes());
- pos += 4;
- } else {
- self.parent_buffer.0[pos] = num_elements as u8;
- pos += 1;
- }
+ write_header(
+ self.parent_buffer.inner_mut(),
+ array_header(is_large, offset_size),
+ is_large,
+ num_elements,
+ );
// Write offsets
for offset in &self.offsets {
- write_offset(
- &mut self.parent_buffer.0[pos..pos + offset_size as usize],
- *offset,
- offset_size,
- );
- pos += offset_size as usize;
+ write_offset(self.parent_buffer.inner_mut(), *offset, offset_size);
}
// Append values
- self.parent_buffer.0.extend_from_slice(&self.buffer.0);
+ self.parent_buffer.append_slice(self.buffer.inner());
}
}
@@ -602,7 +595,7 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> {
return;
};
- let field_id = self.metadata_builder.add_field_name(field_name);
+ let field_id = self.metadata_builder.upsert_field_name(field_name);
self.fields.insert(field_id, *field_start);
self.pending = None;
@@ -615,7 +608,7 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> {
pub fn insert<'m, 'd, T: Into<Variant<'m, 'd>>>(&mut self, key: &str,
value: T) {
self.check_pending_field();
- let field_id = self.metadata_builder.add_field_name(key);
+ let field_id = self.metadata_builder.upsert_field_name(key);
let field_start = self.buffer.offset();
self.fields.insert(field_id, field_start);
@@ -655,7 +648,6 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> {
let data_size = self.buffer.offset();
let num_fields = self.fields.len();
let is_large = num_fields > u8::MAX as usize;
- let size_bytes = if is_large { 4 } else { 1 };
let field_ids_by_sorted_field_name = self
.metadata_builder
@@ -669,55 +661,28 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> {
let id_size = int_size(max_id);
let offset_size = int_size(data_size);
- let header_size = 1
- + size_bytes
- + num_fields * id_size as usize
- + (num_fields + 1) * offset_size as usize;
-
- let parent_start_pos = self.parent_buffer.offset();
-
- make_room_for_header(&mut self.parent_buffer.0, parent_start_pos,
header_size);
-
// Write header
- let mut pos = parent_start_pos;
- self.parent_buffer.0[pos] = object_header(is_large, id_size,
offset_size);
- pos += 1;
-
- if is_large {
- self.parent_buffer.0[pos..pos + 4].copy_from_slice(&(num_fields as
u32).to_le_bytes());
- pos += 4;
- } else {
- self.parent_buffer.0[pos] = num_fields as u8;
- pos += 1;
- }
+ write_header(
+ self.parent_buffer.inner_mut(),
+ object_header(is_large, id_size, offset_size),
+ is_large,
+ num_fields,
+ );
// Write field IDs (sorted order)
for id in &field_ids_by_sorted_field_name {
- write_offset(
- &mut self.parent_buffer.0[pos..pos + id_size as usize],
- *id as usize,
- id_size,
- );
- pos += id_size as usize;
+ write_offset(self.parent_buffer.inner_mut(), *id as usize,
id_size);
}
// Write field offsets
for id in &field_ids_by_sorted_field_name {
let &offset = self.fields.get(id).unwrap();
- write_offset(
- &mut self.parent_buffer.0[pos..pos + offset_size as usize],
- offset,
- offset_size,
- );
- pos += offset_size as usize;
+ write_offset(self.parent_buffer.inner_mut(), offset, offset_size);
}
- write_offset(
- &mut self.parent_buffer.0[pos..pos + offset_size as usize],
- data_size,
- offset_size,
- );
- self.parent_buffer.0.extend_from_slice(&self.buffer.0);
+ write_offset(self.parent_buffer.inner_mut(), data_size, offset_size);
+
+ self.parent_buffer.append_slice(self.buffer.inner());
}
}