This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new e42df82a6d [Variant] Simplify `Builder` buffer operations (#7795)
e42df82a6d is described below

commit e42df82a6d1e52f0dbdfa87130c2974ce460af69
Author: Matthew Kim <[email protected]>
AuthorDate: Fri Jun 27 07:07:41 2025 -0400

    [Variant] Simplify `Builder` buffer operations (#7795)
    
    # Rationale for this change
    
    This PR simplifies how we build up the internal `VariantBuffer` by
    appending to it directly, rather than pre-allocating a buffer filled
    with zeroes and setting values at indices. This avoids indexing math
    that can be hard to follow and reason about.
    
    This PR also aims to design a well-defined API for `ValueBuffer`. My
    thought here was we should not touch the inner `Vec<u8>`. It's quite
    sensitive.
    
    # Are there any user-facing changes?
    
    Nope!
---
 parquet-variant/src/builder.rs | 313 ++++++++++++++++++-----------------------
 1 file changed, 139 insertions(+), 174 deletions(-)

diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs
index 74f8bf2a68..7f26b3279e 100644
--- a/parquet-variant/src/builder.rs
+++ b/parquet-variant/src/builder.rs
@@ -54,30 +54,54 @@ fn int_size(v: usize) -> u8 {
 }
 
 /// Write little-endian integer to buffer
-fn write_offset(buf: &mut [u8], value: usize, nbytes: u8) {
-    for i in 0..nbytes {
-        buf[i as usize] = (value >> (i * 8)) as u8;
-    }
+fn write_offset(buf: &mut Vec<u8>, value: usize, nbytes: u8) {
+    let bytes = value.to_le_bytes();
+    buf.extend_from_slice(&bytes[..nbytes as usize]);
 }
 
-/// Helper to make room for header by moving data
-fn make_room_for_header(buffer: &mut Vec<u8>, start_pos: usize, header_size: 
usize) {
-    let current_len = buffer.len();
-    buffer.resize(current_len + header_size, 0);
-
-    let src_start = start_pos;
-    let src_end = current_len;
-    let dst_start = start_pos + header_size;
+fn write_header(buf: &mut Vec<u8>, header_byte: u8, is_large: bool, num_items: 
usize) {
+    buf.push(header_byte);
 
-    buffer.copy_within(src_start..src_end, dst_start);
+    if is_large {
+        let num_items = num_items as u32;
+        buf.extend_from_slice(&num_items.to_le_bytes());
+    } else {
+        let num_items = num_items as u8;
+        buf.push(num_items);
+    };
 }
-
 #[derive(Default)]
 struct ValueBuffer(Vec<u8>);
 
 impl ValueBuffer {
+    fn append_u8(&mut self, term: u8) {
+        self.0.push(term);
+    }
+
+    fn append_slice(&mut self, other: &[u8]) {
+        self.0.extend_from_slice(other);
+    }
+
+    fn append_primitive_header(&mut self, primitive_type: 
VariantPrimitiveType) {
+        self.0.push(primitive_header(primitive_type));
+    }
+
+    fn inner(&self) -> &[u8] {
+        &self.0
+    }
+
+    fn into_inner(self) -> Vec<u8> {
+        self.0
+    }
+
+    fn inner_mut(&mut self) -> &mut Vec<u8> {
+        &mut self.0
+    }
+
+    // Variant types below
+
     fn append_null(&mut self) {
-        self.0.push(primitive_header(VariantPrimitiveType::Null));
+        self.append_primitive_header(VariantPrimitiveType::Null);
     }
 
     fn append_bool(&mut self, value: bool) {
@@ -86,98 +110,91 @@ impl ValueBuffer {
         } else {
             VariantPrimitiveType::BooleanFalse
         };
-        self.0.push(primitive_header(primitive_type));
+        self.append_primitive_header(primitive_type);
     }
 
     fn append_int8(&mut self, value: i8) {
-        self.0.push(primitive_header(VariantPrimitiveType::Int8));
-        self.0.push(value as u8);
+        self.append_primitive_header(VariantPrimitiveType::Int8);
+        self.append_u8(value as u8);
     }
 
     fn append_int16(&mut self, value: i16) {
-        self.0.push(primitive_header(VariantPrimitiveType::Int16));
-        self.0.extend_from_slice(&value.to_le_bytes());
+        self.append_primitive_header(VariantPrimitiveType::Int16);
+        self.append_slice(&value.to_le_bytes());
     }
 
     fn append_int32(&mut self, value: i32) {
-        self.0.push(primitive_header(VariantPrimitiveType::Int32));
-        self.0.extend_from_slice(&value.to_le_bytes());
+        self.append_primitive_header(VariantPrimitiveType::Int32);
+        self.append_slice(&value.to_le_bytes());
     }
 
     fn append_int64(&mut self, value: i64) {
-        self.0.push(primitive_header(VariantPrimitiveType::Int64));
-        self.0.extend_from_slice(&value.to_le_bytes());
+        self.append_primitive_header(VariantPrimitiveType::Int64);
+        self.append_slice(&value.to_le_bytes());
     }
 
     fn append_float(&mut self, value: f32) {
-        self.0.push(primitive_header(VariantPrimitiveType::Float));
-        self.0.extend_from_slice(&value.to_le_bytes());
+        self.append_primitive_header(VariantPrimitiveType::Float);
+        self.append_slice(&value.to_le_bytes());
     }
 
     fn append_double(&mut self, value: f64) {
-        self.0.push(primitive_header(VariantPrimitiveType::Double));
-        self.0.extend_from_slice(&value.to_le_bytes());
+        self.append_primitive_header(VariantPrimitiveType::Double);
+        self.append_slice(&value.to_le_bytes());
     }
 
     fn append_date(&mut self, value: chrono::NaiveDate) {
-        self.0.push(primitive_header(VariantPrimitiveType::Date));
+        self.append_primitive_header(VariantPrimitiveType::Date);
         let days_since_epoch = 
value.signed_duration_since(UNIX_EPOCH_DATE).num_days() as i32;
-        self.0.extend_from_slice(&days_since_epoch.to_le_bytes());
+        self.append_slice(&days_since_epoch.to_le_bytes());
     }
 
     fn append_timestamp_micros(&mut self, value: 
chrono::DateTime<chrono::Utc>) {
-        self.0
-            .push(primitive_header(VariantPrimitiveType::TimestampMicros));
+        self.append_primitive_header(VariantPrimitiveType::TimestampMicros);
         let micros = value.timestamp_micros();
-        self.0.extend_from_slice(&micros.to_le_bytes());
+        self.append_slice(&micros.to_le_bytes());
     }
 
     fn append_timestamp_ntz_micros(&mut self, value: chrono::NaiveDateTime) {
-        self.0
-            .push(primitive_header(VariantPrimitiveType::TimestampNtzMicros));
+        self.append_primitive_header(VariantPrimitiveType::TimestampNtzMicros);
         let micros = value.and_utc().timestamp_micros();
-        self.0.extend_from_slice(&micros.to_le_bytes());
+        self.append_slice(&micros.to_le_bytes());
     }
 
     fn append_decimal4(&mut self, integer: i32, scale: u8) {
-        self.0
-            .push(primitive_header(VariantPrimitiveType::Decimal4));
-        self.0.push(scale);
-        self.0.extend_from_slice(&integer.to_le_bytes());
+        self.append_primitive_header(VariantPrimitiveType::Decimal4);
+        self.append_u8(scale);
+        self.append_slice(&integer.to_le_bytes());
     }
 
     fn append_decimal8(&mut self, integer: i64, scale: u8) {
-        self.0
-            .push(primitive_header(VariantPrimitiveType::Decimal8));
-        self.0.push(scale);
-        self.0.extend_from_slice(&integer.to_le_bytes());
+        self.append_primitive_header(VariantPrimitiveType::Decimal8);
+        self.append_u8(scale);
+        self.append_slice(&integer.to_le_bytes());
     }
 
     fn append_decimal16(&mut self, integer: i128, scale: u8) {
-        self.0
-            .push(primitive_header(VariantPrimitiveType::Decimal16));
-        self.0.push(scale);
-        self.0.extend_from_slice(&integer.to_le_bytes());
+        self.append_primitive_header(VariantPrimitiveType::Decimal16);
+        self.append_u8(scale);
+        self.append_slice(&integer.to_le_bytes());
     }
 
     fn append_binary(&mut self, value: &[u8]) {
-        self.0.push(primitive_header(VariantPrimitiveType::Binary));
-        self.0
-            .extend_from_slice(&(value.len() as u32).to_le_bytes());
-        self.0.extend_from_slice(value);
+        self.append_primitive_header(VariantPrimitiveType::Binary);
+        self.append_slice(&(value.len() as u32).to_le_bytes());
+        self.append_slice(value);
     }
 
     fn append_short_string(&mut self, value: ShortString) {
         let inner = value.0;
-        self.0.push(short_string_header(inner.len()));
-        self.0.extend_from_slice(inner.as_bytes());
+        self.append_u8(short_string_header(inner.len()));
+        self.append_slice(inner.as_bytes());
     }
 
     fn append_string(&mut self, value: &str) {
-        self.0.push(primitive_header(VariantPrimitiveType::String));
-        self.0
-            .extend_from_slice(&(value.len() as u32).to_le_bytes());
-        self.0.extend_from_slice(value.as_bytes());
+        self.append_primitive_header(VariantPrimitiveType::String);
+        self.append_slice(&(value.len() as u32).to_le_bytes());
+        self.append_slice(value.as_bytes());
     }
 
     fn offset(&self) -> usize {
@@ -227,8 +244,8 @@ struct MetadataBuilder {
 }
 
 impl MetadataBuilder {
-    /// Add field name to dictionary, return its ID
-    fn add_field_name(&mut self, field_name: &str) -> u32 {
+    /// Upsert field name to dictionary, return its ID
+    fn upsert_field_name(&mut self, field_name: &str) -> u32 {
         use std::collections::btree_map::Entry;
         match self.field_name_to_id.entry(field_name.to_string()) {
             Entry::Occupied(entry) => *entry.get(),
@@ -248,6 +265,45 @@ impl MetadataBuilder {
     fn metadata_size(&self) -> usize {
         self.field_names.iter().map(|k| k.len()).sum()
     }
+
+    fn finish(self) -> Vec<u8> {
+        let nkeys = self.num_field_names();
+
+        // Calculate metadata size
+        let total_dict_size: usize = self.metadata_size();
+
+        // Determine appropriate offset size based on the larger of dict size 
or total string size
+        let max_offset = std::cmp::max(total_dict_size, nkeys);
+        let offset_size = int_size(max_offset);
+
+        let offset_start = 1 + offset_size as usize;
+        let string_start = offset_start + (nkeys + 1) * offset_size as usize;
+        let metadata_size = string_start + total_dict_size;
+
+        let mut metadata = Vec::with_capacity(metadata_size);
+
+        // Write header: version=1, not sorted, with calculated offset_size
+        metadata.push(0x01 | ((offset_size - 1) << 6));
+
+        // Write dictionary size
+        write_offset(&mut metadata, nkeys, offset_size);
+
+        // Write offsets
+        let mut cur_offset = 0;
+        for key in self.field_names.iter() {
+            write_offset(&mut metadata, cur_offset, offset_size);
+            cur_offset += key.len();
+        }
+        // Write final offset
+        write_offset(&mut metadata, cur_offset, offset_size);
+
+        // Write string data
+        for key in self.field_names.iter() {
+            metadata.extend_from_slice(key.as_bytes());
+        }
+
+        metadata
+    }
 }
 
 /// Top level builder for [`Variant`] values
@@ -388,6 +444,7 @@ impl MetadataBuilder {
 /// );
 ///
 /// ```
+#[derive(Default)]
 pub struct VariantBuilder {
     buffer: ValueBuffer,
     metadata_builder: MetadataBuilder,
@@ -420,54 +477,7 @@ impl VariantBuilder {
     }
 
     pub fn finish(self) -> (Vec<u8>, Vec<u8>) {
-        let nkeys = self.metadata_builder.num_field_names();
-
-        // Calculate metadata size
-        let total_dict_size: usize = self.metadata_builder.metadata_size();
-
-        // Determine appropriate offset size based on the larger of dict size 
or total string size
-        let max_offset = std::cmp::max(total_dict_size, nkeys);
-        let offset_size = int_size(max_offset);
-
-        let offset_start = 1 + offset_size as usize;
-        let string_start = offset_start + (nkeys + 1) * offset_size as usize;
-        let metadata_size = string_start + total_dict_size;
-
-        // Pre-allocate exact size to avoid reallocations
-        let mut metadata = vec![0u8; metadata_size];
-
-        // Write header: version=1, not sorted, with calculated offset_size
-        metadata[0] = 0x01 | ((offset_size - 1) << 6);
-
-        // Write dictionary size
-        write_offset(&mut metadata[1..], nkeys, offset_size);
-
-        // Write offsets and string data
-        let mut cur_offset = 0;
-        for (i, key) in self.metadata_builder.field_names.iter().enumerate() {
-            write_offset(
-                &mut metadata[offset_start + i * offset_size as usize..],
-                cur_offset,
-                offset_size,
-            );
-            let start = string_start + cur_offset;
-            metadata[start..start + key.len()].copy_from_slice(key.as_bytes());
-            cur_offset += key.len();
-        }
-        // Write final offset
-        write_offset(
-            &mut metadata[offset_start + nkeys * offset_size as usize..],
-            cur_offset,
-            offset_size,
-        );
-
-        (metadata, self.buffer.0)
-    }
-}
-
-impl Default for VariantBuilder {
-    fn default() -> Self {
-        Self::new()
+        (self.metadata_builder.finish(), self.buffer.into_inner())
     }
 }
 
@@ -537,40 +547,23 @@ impl<'a> ListBuilder<'a> {
         let data_size = self.buffer.offset();
         let num_elements = self.offsets.len() - 1;
         let is_large = num_elements > u8::MAX as usize;
-        let size_bytes = if is_large { 4 } else { 1 };
         let offset_size = int_size(data_size);
-        let header_size = 1 + size_bytes + (num_elements + 1) * offset_size as 
usize;
-
-        let parent_start_pos = self.parent_buffer.offset();
-
-        make_room_for_header(&mut self.parent_buffer.0, parent_start_pos, 
header_size);
 
         // Write header
-        let mut pos = parent_start_pos;
-        self.parent_buffer.0[pos] = array_header(is_large, offset_size);
-        pos += 1;
-
-        if is_large {
-            self.parent_buffer.0[pos..pos + 4]
-                .copy_from_slice(&(num_elements as u32).to_le_bytes());
-            pos += 4;
-        } else {
-            self.parent_buffer.0[pos] = num_elements as u8;
-            pos += 1;
-        }
+        write_header(
+            self.parent_buffer.inner_mut(),
+            array_header(is_large, offset_size),
+            is_large,
+            num_elements,
+        );
 
         // Write offsets
         for offset in &self.offsets {
-            write_offset(
-                &mut self.parent_buffer.0[pos..pos + offset_size as usize],
-                *offset,
-                offset_size,
-            );
-            pos += offset_size as usize;
+            write_offset(self.parent_buffer.inner_mut(), *offset, offset_size);
         }
 
         // Append values
-        self.parent_buffer.0.extend_from_slice(&self.buffer.0);
+        self.parent_buffer.append_slice(self.buffer.inner());
     }
 }
 
@@ -602,7 +595,7 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> {
             return;
         };
 
-        let field_id = self.metadata_builder.add_field_name(field_name);
+        let field_id = self.metadata_builder.upsert_field_name(field_name);
         self.fields.insert(field_id, *field_start);
 
         self.pending = None;
@@ -615,7 +608,7 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> {
     pub fn insert<'m, 'd, T: Into<Variant<'m, 'd>>>(&mut self, key: &str, 
value: T) {
         self.check_pending_field();
 
-        let field_id = self.metadata_builder.add_field_name(key);
+        let field_id = self.metadata_builder.upsert_field_name(key);
         let field_start = self.buffer.offset();
 
         self.fields.insert(field_id, field_start);
@@ -655,7 +648,6 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> {
         let data_size = self.buffer.offset();
         let num_fields = self.fields.len();
         let is_large = num_fields > u8::MAX as usize;
-        let size_bytes = if is_large { 4 } else { 1 };
 
         let field_ids_by_sorted_field_name = self
             .metadata_builder
@@ -669,55 +661,28 @@ impl<'a, 'b> ObjectBuilder<'a, 'b> {
         let id_size = int_size(max_id);
         let offset_size = int_size(data_size);
 
-        let header_size = 1
-            + size_bytes
-            + num_fields * id_size as usize
-            + (num_fields + 1) * offset_size as usize;
-
-        let parent_start_pos = self.parent_buffer.offset();
-
-        make_room_for_header(&mut self.parent_buffer.0, parent_start_pos, 
header_size);
-
         // Write header
-        let mut pos = parent_start_pos;
-        self.parent_buffer.0[pos] = object_header(is_large, id_size, 
offset_size);
-        pos += 1;
-
-        if is_large {
-            self.parent_buffer.0[pos..pos + 4].copy_from_slice(&(num_fields as 
u32).to_le_bytes());
-            pos += 4;
-        } else {
-            self.parent_buffer.0[pos] = num_fields as u8;
-            pos += 1;
-        }
+        write_header(
+            self.parent_buffer.inner_mut(),
+            object_header(is_large, id_size, offset_size),
+            is_large,
+            num_fields,
+        );
 
         // Write field IDs (sorted order)
         for id in &field_ids_by_sorted_field_name {
-            write_offset(
-                &mut self.parent_buffer.0[pos..pos + id_size as usize],
-                *id as usize,
-                id_size,
-            );
-            pos += id_size as usize;
+            write_offset(self.parent_buffer.inner_mut(), *id as usize, 
id_size);
         }
 
         // Write field offsets
         for id in &field_ids_by_sorted_field_name {
             let &offset = self.fields.get(id).unwrap();
-            write_offset(
-                &mut self.parent_buffer.0[pos..pos + offset_size as usize],
-                offset,
-                offset_size,
-            );
-            pos += offset_size as usize;
+            write_offset(self.parent_buffer.inner_mut(), offset, offset_size);
         }
-        write_offset(
-            &mut self.parent_buffer.0[pos..pos + offset_size as usize],
-            data_size,
-            offset_size,
-        );
 
-        self.parent_buffer.0.extend_from_slice(&self.buffer.0);
+        write_offset(self.parent_buffer.inner_mut(), data_size, offset_size);
+
+        self.parent_buffer.append_slice(self.buffer.inner());
     }
 }
 

Reply via email to