This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 13c43c4896 [Variant] Optimize the object header generation logic in 
ObjectBuilder::finish (#8031)
13c43c4896 is described below

commit 13c43c489628a051f2f00ed9ba97d853d34287a8
Author: Congxian Qiu <[email protected]>
AuthorDate: Sat Jan 10 04:13:21 2026 +0800

    [Variant] Optimize the object header generation logic in 
ObjectBuilder::finish (#8031)
    
    # Which issue does this PR close?
    
    This pr wants to optimize the logic of `ObjectBuilder::finish`
    
    - Closes #7978 .
    
    # Rationale for this change
    
    This pr wants to optimize the logic of `ObjectBuilder::finish`
    
    # What changes are included in this PR?
    
    This PR wants to optimize `ObjectBuilder::finish` with packedu3 iterator
    
    # Are these changes tested?
    
    This pr was covered by existing test
    
    # Are there any user-facing changes?
    
    No
---
 parquet-variant-compute/Cargo.toml                 |   1 +
 parquet-variant-compute/benches/variant_kernels.rs | 147 +++++++++++++++++++++
 parquet-variant-json/src/from_json.rs              |   2 +-
 parquet-variant-json/src/lib.rs                    |   2 +-
 parquet-variant/src/builder.rs                     |  75 +----------
 parquet-variant/src/builder/list.rs                |   2 +-
 parquet-variant/src/builder/metadata.rs            |   2 +-
 parquet-variant/src/builder/object.rs              | 116 ++++++++++------
 8 files changed, 236 insertions(+), 111 deletions(-)

diff --git a/parquet-variant-compute/Cargo.toml 
b/parquet-variant-compute/Cargo.toml
index 74c3dd3fb7..85d66a9cf7 100644
--- a/parquet-variant-compute/Cargo.toml
+++ b/parquet-variant-compute/Cargo.toml
@@ -37,6 +37,7 @@ parquet-variant = { workspace = true }
 parquet-variant-json = { workspace = true }
 chrono = { workspace = true }
 uuid = { version = "1.18.0", features = ["v4"]}
+serde_json = "1.0"
 
 [lib]
 name = "parquet_variant_compute"
diff --git a/parquet-variant-compute/benches/variant_kernels.rs 
b/parquet-variant-compute/benches/variant_kernels.rs
index 13ff77d9fb..383697ab8c 100644
--- a/parquet-variant-compute/benches/variant_kernels.rs
+++ b/parquet-variant-compute/benches/variant_kernels.rs
@@ -23,12 +23,15 @@ use parquet_variant::{EMPTY_VARIANT_METADATA_BYTES, 
Variant, VariantBuilder};
 use parquet_variant_compute::{
     GetOptions, VariantArray, VariantArrayBuilder, json_to_variant, 
variant_get,
 };
+use parquet_variant_json::append_json;
 use rand::Rng;
 use rand::SeedableRng;
 use rand::distr::Alphanumeric;
 use rand::rngs::StdRng;
+use serde_json::Value;
 use std::fmt::Write;
 use std::sync::Arc;
+
 fn benchmark_batch_json_string_to_variant(c: &mut Criterion) {
     let input_array = 
StringArray::from_iter_values(json_repeated_struct(8000));
     let array_ref: ArrayRef = Arc::new(input_array);
@@ -66,6 +69,58 @@ fn benchmark_batch_json_string_to_variant(c: &mut Criterion) 
{
         });
     });
 
+    let input_array = StringArray::from_iter_values(random_structure(8000, 
200));
+    let total_input_bytes = input_array
+        .iter()
+        .flatten() // filter None
+        .map(|v| v.len())
+        .sum::<usize>();
+    let id = format!(
+        "batch_json_string_to_variant object - 1 depth(200 fields) 
random_json({} bytes per document)",
+        total_input_bytes / input_array.len()
+    );
+    let array_ref: ArrayRef = Arc::new(input_array);
+    let string_array = 
array_ref.as_any().downcast_ref::<StringArray>().unwrap();
+    let mut json_array: Vec<Value> = Vec::with_capacity(string_array.len());
+    for i in 0..string_array.len() {
+        json_array.push(serde_json::from_str(string_array.value(i)).unwrap());
+    }
+    c.bench_function(&id, |b| {
+        b.iter(|| {
+            let mut variant_array_builder = 
VariantArrayBuilder::new(string_array.len());
+            for json in &json_array {
+                append_json(json, &mut variant_array_builder).unwrap();
+            }
+            let _ = variant_array_builder.build();
+        });
+    });
+
+    let input_array = StringArray::from_iter_values(random_structure(8000, 
100));
+    let total_input_bytes = input_array
+        .iter()
+        .flatten() // filter None
+        .map(|v| v.len())
+        .sum::<usize>();
+    let id = format!(
+        "batch_json_string_to_variant object - 1 depth(100 fields) 
random_json({} bytes per document)",
+        total_input_bytes / input_array.len()
+    );
+    let array_ref: ArrayRef = Arc::new(input_array);
+    let string_array = 
array_ref.as_any().downcast_ref::<StringArray>().unwrap();
+    let mut json_array: Vec<Value> = Vec::with_capacity(string_array.len());
+    for i in 0..string_array.len() {
+        json_array.push(serde_json::from_str(string_array.value(i)).unwrap());
+    }
+    c.bench_function(&id, |b| {
+        b.iter(|| {
+            let mut variant_array_builder = 
VariantArrayBuilder::new(string_array.len());
+            for json in &json_array {
+                append_json(json, &mut variant_array_builder).unwrap();
+            }
+            let _ = variant_array_builder.build();
+        });
+    });
+
     let input_array = 
StringArray::from_iter_values(random_json_structure(8000));
     let total_input_bytes = input_array
         .iter()
@@ -240,6 +295,22 @@ fn random_json_structure(count: usize) -> impl 
Iterator<Item = String> {
     (0..count).map(move |_| generator.next().to_string())
 }
 
+fn random_structure(count: usize, max_fields: usize) -> impl Iterator<Item = 
String> {
+    let mut generator = RandomJsonGenerator {
+        null_weight: 5,
+        string_weight: 25,
+        number_weight: 25,
+        boolean_weight: 10,
+        object_weight: 25,
+        array_weight: 0,
+        max_fields,
+        max_array_length: 0,
+        max_depth: 1,
+        ..Default::default()
+    };
+    (0..count).map(move |_| generator.next_object().to_string())
+}
+
 /// Creates JSON with random structure and fields.
 ///
 /// Each type is created in proportion controlled by the
@@ -299,6 +370,82 @@ impl RandomJsonGenerator {
         &self.output_buffer
     }
 
+    fn next_object(&mut self) -> &str {
+        self.output_buffer.clear();
+        self.append_random_json_for_object();
+        &self.output_buffer
+    }
+
+    fn append_random_json_for_object(&mut self) {
+        // use destructuring to ensure each field is used
+        let Self {
+            rng,
+            null_weight,
+            string_weight,
+            number_weight,
+            boolean_weight,
+            max_fields,
+            output_buffer,
+            ..
+        } = self;
+
+        write!(output_buffer, "{{").unwrap();
+        for i in 0..*max_fields {
+            let key_length = rng.random_range(1..=20);
+            let key: String = (0..key_length)
+                .map(|_| rng.sample(Alphanumeric) as char)
+                .collect();
+            write!(output_buffer, "\"{key}\":").unwrap();
+
+            let total_weight = *null_weight + *string_weight + *number_weight 
+ *boolean_weight;
+
+            // Generate a random number to determine the type
+            let mut random_value: usize = rng.random_range(0..total_weight);
+
+            if random_value <= *null_weight {
+                write!(output_buffer, "null").unwrap();
+            } else {
+                random_value -= *null_weight;
+
+                if random_value <= *string_weight {
+                    // Generate a random string between 1 and 20 characters
+                    let length = rng.random_range(1..=20);
+                    let random_string: String = (0..length)
+                        .map(|_| rng.sample(Alphanumeric) as char)
+                        .collect();
+                    write!(output_buffer, "\"{random_string}\"",).unwrap();
+                } else {
+                    random_value -= *string_weight;
+
+                    if random_value <= *number_weight {
+                        // 50% chance of generating an integer or a float
+                        if rng.random_bool(0.5) {
+                            // Generate a random integer
+                            let random_integer: i64 = 
rng.random_range(-1000..1000);
+                            write!(output_buffer, 
"{random_integer}",).unwrap();
+                        } else {
+                            // Generate a random float
+                            let random_float: f64 = 
rng.random_range(-1000.0..1000.0);
+                            write!(output_buffer, "{random_float}",).unwrap();
+                        }
+                    } else {
+                        random_value -= *number_weight;
+
+                        if random_value <= *boolean_weight {
+                            // Generate a random boolean
+                            let random_boolean: bool = rng.random();
+                            write!(output_buffer, 
"{random_boolean}",).unwrap();
+                        }
+                    }
+                }
+            }
+            if i < *max_fields - 1 {
+                write!(output_buffer, ",").unwrap();
+            }
+        }
+        write!(&mut self.output_buffer, "}}").unwrap();
+    }
+
     /// Appends a random JSON value to the output buffer.
     fn append_random_json(&mut self, current_depth: usize) {
         // use destructuring to ensure each field is used
diff --git a/parquet-variant-json/src/from_json.rs 
b/parquet-variant-json/src/from_json.rs
index 33e1b2e6db..4c22785ef1 100644
--- a/parquet-variant-json/src/from_json.rs
+++ b/parquet-variant-json/src/from_json.rs
@@ -102,7 +102,7 @@ fn variant_from_number<'m, 'v>(n: &Number) -> 
Result<Variant<'m, 'v>, ArrowError
     }
 }
 
-fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> 
Result<(), ArrowError> {
+pub fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> 
Result<(), ArrowError> {
     match json {
         Value::Null => builder.append_value(Variant::Null),
         Value::Bool(b) => builder.append_value(*b),
diff --git a/parquet-variant-json/src/lib.rs b/parquet-variant-json/src/lib.rs
index f24c740818..6b42b15bd4 100644
--- a/parquet-variant-json/src/lib.rs
+++ b/parquet-variant-json/src/lib.rs
@@ -34,5 +34,5 @@
 mod from_json;
 mod to_json;
 
-pub use from_json::JsonToVariant;
+pub use from_json::{JsonToVariant, append_json};
 pub use to_json::VariantToJson;
diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs
index 7094d935a5..e6122f062c 100644
--- a/parquet-variant/src/builder.rs
+++ b/parquet-variant/src/builder.rs
@@ -14,7 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-use crate::decoder::{VariantBasicType, VariantPrimitiveType};
+use crate::decoder::{OffsetSizeBytes, VariantBasicType, VariantPrimitiveType};
 use crate::{
     ShortString, Variant, VariantDecimal4, VariantDecimal8, VariantDecimal16, 
VariantList,
     VariantMetadata, VariantObject,
@@ -43,21 +43,15 @@ fn short_string_header(len: usize) -> u8 {
     (len as u8) << 2 | VariantBasicType::ShortString as u8
 }
 
-pub(crate) fn int_size(v: usize) -> u8 {
+pub(crate) fn int_size(v: usize) -> OffsetSizeBytes {
     match v {
-        0..=0xFF => 1,
-        0x100..=0xFFFF => 2,
-        0x10000..=0xFFFFFF => 3,
-        _ => 4,
+        0..=0xFF => OffsetSizeBytes::One,
+        0x100..=0xFFFF => OffsetSizeBytes::Two,
+        0x10000..=0xFFFFFF => OffsetSizeBytes::Three,
+        _ => OffsetSizeBytes::Four,
     }
 }
 
-/// Write little-endian integer to buffer at a specific position
-fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes: 
u8) {
-    let bytes = value.to_le_bytes();
-    buf[start_pos..start_pos + nbytes as 
usize].copy_from_slice(&bytes[..nbytes as usize]);
-}
-
 /// Wrapper around a `Vec<u8>` that provides methods for appending
 /// primitive values, variant types, and metadata.
 ///
@@ -358,63 +352,6 @@ impl ValueBuilder {
         );
         state.finish();
     }
-
-    /// Writes out the header byte for a variant object or list, from the 
starting position
-    /// of the builder, will return the position after this write
-    pub(crate) fn append_header_start_from_buf_pos(
-        &mut self,
-        start_pos: usize, // the start position where the header will be 
inserted
-        header_byte: u8,
-        is_large: bool,
-        num_fields: usize,
-    ) -> usize {
-        let buffer = self.inner_mut();
-
-        // Write header at the original start position
-        let mut header_pos = start_pos;
-
-        // Write header byte
-        buffer[header_pos] = header_byte;
-        header_pos += 1;
-
-        // Write number of fields
-        if is_large {
-            buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as 
u32).to_le_bytes());
-            header_pos += 4;
-        } else {
-            buffer[header_pos] = num_fields as u8;
-            header_pos += 1;
-        }
-
-        header_pos
-    }
-
-    /// Writes out the offsets for an array of offsets, including the final 
offset (data size).
-    /// from the starting position of the buffer, will return the position 
after this write
-    pub(crate) fn append_offset_array_start_from_buf_pos(
-        &mut self,
-        start_pos: usize,
-        offsets: impl IntoIterator<Item = usize>,
-        data_size: Option<usize>,
-        nbytes: u8,
-    ) -> usize {
-        let buf = self.inner_mut();
-
-        let mut current_pos = start_pos;
-        for relative_offset in offsets {
-            write_offset_at_pos(buf, current_pos, relative_offset, nbytes);
-            current_pos += nbytes as usize;
-        }
-
-        // Write data_size
-        if let Some(data_size) = data_size {
-            // Write data_size at the end of the offsets
-            write_offset_at_pos(buf, current_pos, data_size, nbytes);
-            current_pos += nbytes as usize;
-        }
-
-        current_pos
-    }
 }
 
 /// A trait for managing state specific to different builder types.
diff --git a/parquet-variant/src/builder/list.rs 
b/parquet-variant/src/builder/list.rs
index 4c2682c50a..5064904ca7 100644
--- a/parquet-variant/src/builder/list.rs
+++ b/parquet-variant/src/builder/list.rs
@@ -174,7 +174,7 @@ impl<'a, S: BuilderSpecificState> ListBuilder<'a, S> {
         // Make sure to reserve enough capacity to handle the extra bytes 
we'll truncate.
         let mut bytes_to_splice = Vec::with_capacity(header_size + 3);
         // Write header
-        let header = array_header(is_large, offset_size);
+        let header = array_header(is_large, offset_size as _);
         bytes_to_splice.push(header);
 
         append_packed_u32(&mut bytes_to_splice, num_elements as u32, 
num_elements_size);
diff --git a/parquet-variant/src/builder/metadata.rs 
b/parquet-variant/src/builder/metadata.rs
index 10163ba3e0..efccc2e4c6 100644
--- a/parquet-variant/src/builder/metadata.rs
+++ b/parquet-variant/src/builder/metadata.rs
@@ -206,7 +206,7 @@ impl WritableMetadataBuilder {
 
         // Determine appropriate offset size based on the larger of dict size 
or total string size
         let max_offset = std::cmp::max(total_dict_size, nkeys);
-        let offset_size = int_size(max_offset);
+        let offset_size = int_size(max_offset) as u8;
 
         let offset_start = 1 + offset_size as usize;
         let string_start = offset_start + (nkeys + 1) * offset_size as usize;
diff --git a/parquet-variant/src/builder/object.rs 
b/parquet-variant/src/builder/object.rs
index ab04360c16..876c2e2d4c 100644
--- a/parquet-variant/src/builder/object.rs
+++ b/parquet-variant/src/builder/object.rs
@@ -24,14 +24,50 @@ use crate::{
 use arrow_schema::ArrowError;
 use indexmap::IndexMap;
 
-fn object_header(large: bool, id_size: u8, offset_size: u8) -> u8 {
-    let large_bit = if large { 1 } else { 0 };
-    (large_bit << (BASIC_TYPE_BITS + 4))
-        | ((id_size - 1) << (BASIC_TYPE_BITS + 2))
-        | ((offset_size - 1) << BASIC_TYPE_BITS)
+fn object_header<const LARGE_BIT: u8, const ID_SIZE: u8, const OFFSET_SIZE: 
u8>() -> u8 {
+    (LARGE_BIT << (BASIC_TYPE_BITS + 4))
+        | ((ID_SIZE - 1) << (BASIC_TYPE_BITS + 2))
+        | ((OFFSET_SIZE - 1) << BASIC_TYPE_BITS)
         | VariantBasicType::Object as u8
 }
 
+struct ObjectHeaderWriter<const OFFSET_SIZE: u8, const ID_SIZE: u8>();
+
+impl<const OFFSET_SIZE: u8, const ID_SIZE: u8> ObjectHeaderWriter<OFFSET_SIZE, 
ID_SIZE> {
+    fn write(
+        dst: &mut Vec<u8>,
+        num_fields: usize,
+        field_ids: impl Iterator<Item = u32>,
+        offsets: impl Iterator<Item = usize>,
+        data_size: usize,
+    ) {
+        let is_large = num_fields > u8::MAX as usize;
+        // num_fields will consume 4 bytes when it is larger than u8::MAX
+        if is_large {
+            dst.push(object_header::<1, { ID_SIZE }, { OFFSET_SIZE }>());
+            append_packed_u32::<4>(dst, num_fields);
+        } else {
+            dst.push(object_header::<0, { ID_SIZE }, { OFFSET_SIZE }>());
+            append_packed_u32::<1>(dst, num_fields);
+        }
+
+        for id in field_ids {
+            append_packed_u32::<ID_SIZE>(dst, id as usize);
+        }
+
+        for off in offsets {
+            append_packed_u32::<OFFSET_SIZE>(dst, off);
+        }
+
+        append_packed_u32::<OFFSET_SIZE>(dst, data_size);
+    }
+}
+
+#[inline(always)]
+fn append_packed_u32<const SIZE: u8>(dest: &mut Vec<u8>, value: usize) {
+    dest.extend_from_slice(&value.to_le_bytes()[..SIZE as usize]);
+}
+
 /// A builder for creating [`Variant::Object`] values.
 ///
 /// See the examples on [`VariantBuilder`] for usage.
@@ -245,41 +281,45 @@ impl<'a, S: BuilderSpecificState> ObjectBuilder<'a, S> {
             (num_fields * id_size as usize) + // field IDs
             ((num_fields + 1) * offset_size as usize); // field offsets + 
data_size
 
+        let mut bytes_to_splice = Vec::with_capacity(header_size);
+
+        macro_rules! write_header {
+            ($offset_size:expr, $id_size:expr) => {
+                ObjectHeaderWriter::<{ $offset_size as u8 }, { $id_size as u8 
}>::write(
+                    &mut bytes_to_splice,
+                    num_fields,
+                    self.fields.keys().copied(),
+                    self.fields.values().copied(),
+                    data_size,
+                )
+            };
+        }
+
+        use crate::decoder::OffsetSizeBytes::*;
+        match (offset_size, id_size) {
+            (One, One) => write_header!(One, One),
+            (One, Two) => write_header!(One, Two),
+            (One, Three) => write_header!(One, Three),
+            (One, Four) => write_header!(One, Four),
+            (Two, One) => write_header!(Two, One),
+            (Two, Two) => write_header!(Two, Two),
+            (Two, Three) => write_header!(Two, Three),
+            (Two, Four) => write_header!(Two, Four),
+            (Three, One) => write_header!(Three, One),
+            (Three, Two) => write_header!(Three, Two),
+            (Three, Three) => write_header!(Three, Three),
+            (Three, Four) => write_header!(Three, Four),
+            (Four, One) => write_header!(Four, One),
+            (Four, Two) => write_header!(Four, Two),
+            (Four, Three) => write_header!(Four, Three),
+            (Four, Four) => write_header!(Four, Four),
+        }
+
         // Shift existing data to make room for the header
-        value_builder.inner_mut().splice(
-            starting_offset..starting_offset,
-            std::iter::repeat_n(0u8, header_size),
-        );
+        value_builder
+            .inner_mut()
+            .splice(starting_offset..starting_offset, bytes_to_splice);
 
-        // Write header at the original start position
-        let mut header_pos = starting_offset;
-
-        // Write header byte
-        let header = object_header(is_large, id_size, offset_size);
-
-        header_pos = self
-            .parent_state
-            .value_builder()
-            .append_header_start_from_buf_pos(header_pos, header, is_large, 
num_fields);
-
-        header_pos = self
-            .parent_state
-            .value_builder()
-            .append_offset_array_start_from_buf_pos(
-                header_pos,
-                self.fields.keys().copied().map(|id| id as usize),
-                None,
-                id_size,
-            );
-
-        self.parent_state
-            .value_builder()
-            .append_offset_array_start_from_buf_pos(
-                header_pos,
-                self.fields.values().copied(),
-                Some(data_size),
-                offset_size,
-            );
         self.parent_state.finish();
     }
 }

Reply via email to