This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 13c43c4896 [Variant] Optimize the object header generation logic in
ObjectBuilder::finish (#8031)
13c43c4896 is described below
commit 13c43c489628a051f2f00ed9ba97d853d34287a8
Author: Congxian Qiu <[email protected]>
AuthorDate: Sat Jan 10 04:13:21 2026 +0800
[Variant] Optimize the object header generation logic in
ObjectBuilder::finish (#8031)
# Which issue does this PR close?
This pr wants to optimize the logic of `ObjectBuilder::finish`
- Closes #7978 .
# Rationale for this change
This pr wants to optimize the logic of `ObjectBuilder::finish`
# What changes are included in this PR?
This PR wants to optimize `ObjectBuilder::finish` with packedu3 iterator
# Are these changes tested?
This pr was covered by existing test
# Are there any user-facing changes?
No
---
parquet-variant-compute/Cargo.toml | 1 +
parquet-variant-compute/benches/variant_kernels.rs | 147 +++++++++++++++++++++
parquet-variant-json/src/from_json.rs | 2 +-
parquet-variant-json/src/lib.rs | 2 +-
parquet-variant/src/builder.rs | 75 +----------
parquet-variant/src/builder/list.rs | 2 +-
parquet-variant/src/builder/metadata.rs | 2 +-
parquet-variant/src/builder/object.rs | 116 ++++++++++------
8 files changed, 236 insertions(+), 111 deletions(-)
diff --git a/parquet-variant-compute/Cargo.toml
b/parquet-variant-compute/Cargo.toml
index 74c3dd3fb7..85d66a9cf7 100644
--- a/parquet-variant-compute/Cargo.toml
+++ b/parquet-variant-compute/Cargo.toml
@@ -37,6 +37,7 @@ parquet-variant = { workspace = true }
parquet-variant-json = { workspace = true }
chrono = { workspace = true }
uuid = { version = "1.18.0", features = ["v4"]}
+serde_json = "1.0"
[lib]
name = "parquet_variant_compute"
diff --git a/parquet-variant-compute/benches/variant_kernels.rs
b/parquet-variant-compute/benches/variant_kernels.rs
index 13ff77d9fb..383697ab8c 100644
--- a/parquet-variant-compute/benches/variant_kernels.rs
+++ b/parquet-variant-compute/benches/variant_kernels.rs
@@ -23,12 +23,15 @@ use parquet_variant::{EMPTY_VARIANT_METADATA_BYTES,
Variant, VariantBuilder};
use parquet_variant_compute::{
GetOptions, VariantArray, VariantArrayBuilder, json_to_variant,
variant_get,
};
+use parquet_variant_json::append_json;
use rand::Rng;
use rand::SeedableRng;
use rand::distr::Alphanumeric;
use rand::rngs::StdRng;
+use serde_json::Value;
use std::fmt::Write;
use std::sync::Arc;
+
fn benchmark_batch_json_string_to_variant(c: &mut Criterion) {
let input_array =
StringArray::from_iter_values(json_repeated_struct(8000));
let array_ref: ArrayRef = Arc::new(input_array);
@@ -66,6 +69,58 @@ fn benchmark_batch_json_string_to_variant(c: &mut Criterion)
{
});
});
+ let input_array = StringArray::from_iter_values(random_structure(8000,
200));
+ let total_input_bytes = input_array
+ .iter()
+ .flatten() // filter None
+ .map(|v| v.len())
+ .sum::<usize>();
+ let id = format!(
+ "batch_json_string_to_variant object - 1 depth(200 fields)
random_json({} bytes per document)",
+ total_input_bytes / input_array.len()
+ );
+ let array_ref: ArrayRef = Arc::new(input_array);
+ let string_array =
array_ref.as_any().downcast_ref::<StringArray>().unwrap();
+ let mut json_array: Vec<Value> = Vec::with_capacity(string_array.len());
+ for i in 0..string_array.len() {
+ json_array.push(serde_json::from_str(string_array.value(i)).unwrap());
+ }
+ c.bench_function(&id, |b| {
+ b.iter(|| {
+ let mut variant_array_builder =
VariantArrayBuilder::new(string_array.len());
+ for json in &json_array {
+ append_json(json, &mut variant_array_builder).unwrap();
+ }
+ let _ = variant_array_builder.build();
+ });
+ });
+
+ let input_array = StringArray::from_iter_values(random_structure(8000,
100));
+ let total_input_bytes = input_array
+ .iter()
+ .flatten() // filter None
+ .map(|v| v.len())
+ .sum::<usize>();
+ let id = format!(
+ "batch_json_string_to_variant object - 1 depth(100 fields)
random_json({} bytes per document)",
+ total_input_bytes / input_array.len()
+ );
+ let array_ref: ArrayRef = Arc::new(input_array);
+ let string_array =
array_ref.as_any().downcast_ref::<StringArray>().unwrap();
+ let mut json_array: Vec<Value> = Vec::with_capacity(string_array.len());
+ for i in 0..string_array.len() {
+ json_array.push(serde_json::from_str(string_array.value(i)).unwrap());
+ }
+ c.bench_function(&id, |b| {
+ b.iter(|| {
+ let mut variant_array_builder =
VariantArrayBuilder::new(string_array.len());
+ for json in &json_array {
+ append_json(json, &mut variant_array_builder).unwrap();
+ }
+ let _ = variant_array_builder.build();
+ });
+ });
+
let input_array =
StringArray::from_iter_values(random_json_structure(8000));
let total_input_bytes = input_array
.iter()
@@ -240,6 +295,22 @@ fn random_json_structure(count: usize) -> impl
Iterator<Item = String> {
(0..count).map(move |_| generator.next().to_string())
}
+fn random_structure(count: usize, max_fields: usize) -> impl Iterator<Item =
String> {
+ let mut generator = RandomJsonGenerator {
+ null_weight: 5,
+ string_weight: 25,
+ number_weight: 25,
+ boolean_weight: 10,
+ object_weight: 25,
+ array_weight: 0,
+ max_fields,
+ max_array_length: 0,
+ max_depth: 1,
+ ..Default::default()
+ };
+ (0..count).map(move |_| generator.next_object().to_string())
+}
+
/// Creates JSON with random structure and fields.
///
/// Each type is created in proportion controlled by the
@@ -299,6 +370,82 @@ impl RandomJsonGenerator {
&self.output_buffer
}
+ fn next_object(&mut self) -> &str {
+ self.output_buffer.clear();
+ self.append_random_json_for_object();
+ &self.output_buffer
+ }
+
+ fn append_random_json_for_object(&mut self) {
+ // use destructuring to ensure each field is used
+ let Self {
+ rng,
+ null_weight,
+ string_weight,
+ number_weight,
+ boolean_weight,
+ max_fields,
+ output_buffer,
+ ..
+ } = self;
+
+ write!(output_buffer, "{{").unwrap();
+ for i in 0..*max_fields {
+ let key_length = rng.random_range(1..=20);
+ let key: String = (0..key_length)
+ .map(|_| rng.sample(Alphanumeric) as char)
+ .collect();
+ write!(output_buffer, "\"{key}\":").unwrap();
+
+ let total_weight = *null_weight + *string_weight + *number_weight
+ *boolean_weight;
+
+ // Generate a random number to determine the type
+ let mut random_value: usize = rng.random_range(0..total_weight);
+
+ if random_value <= *null_weight {
+ write!(output_buffer, "null").unwrap();
+ } else {
+ random_value -= *null_weight;
+
+ if random_value <= *string_weight {
+ // Generate a random string between 1 and 20 characters
+ let length = rng.random_range(1..=20);
+ let random_string: String = (0..length)
+ .map(|_| rng.sample(Alphanumeric) as char)
+ .collect();
+ write!(output_buffer, "\"{random_string}\"",).unwrap();
+ } else {
+ random_value -= *string_weight;
+
+ if random_value <= *number_weight {
+ // 50% chance of generating an integer or a float
+ if rng.random_bool(0.5) {
+ // Generate a random integer
+ let random_integer: i64 =
rng.random_range(-1000..1000);
+ write!(output_buffer,
"{random_integer}",).unwrap();
+ } else {
+ // Generate a random float
+ let random_float: f64 =
rng.random_range(-1000.0..1000.0);
+ write!(output_buffer, "{random_float}",).unwrap();
+ }
+ } else {
+ random_value -= *number_weight;
+
+ if random_value <= *boolean_weight {
+ // Generate a random boolean
+ let random_boolean: bool = rng.random();
+ write!(output_buffer,
"{random_boolean}",).unwrap();
+ }
+ }
+ }
+ }
+ if i < *max_fields - 1 {
+ write!(output_buffer, ",").unwrap();
+ }
+ }
+ write!(&mut self.output_buffer, "}}").unwrap();
+ }
+
/// Appends a random JSON value to the output buffer.
fn append_random_json(&mut self, current_depth: usize) {
// use destructuring to ensure each field is used
diff --git a/parquet-variant-json/src/from_json.rs
b/parquet-variant-json/src/from_json.rs
index 33e1b2e6db..4c22785ef1 100644
--- a/parquet-variant-json/src/from_json.rs
+++ b/parquet-variant-json/src/from_json.rs
@@ -102,7 +102,7 @@ fn variant_from_number<'m, 'v>(n: &Number) ->
Result<Variant<'m, 'v>, ArrowError
}
}
-fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) ->
Result<(), ArrowError> {
+pub fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) ->
Result<(), ArrowError> {
match json {
Value::Null => builder.append_value(Variant::Null),
Value::Bool(b) => builder.append_value(*b),
diff --git a/parquet-variant-json/src/lib.rs b/parquet-variant-json/src/lib.rs
index f24c740818..6b42b15bd4 100644
--- a/parquet-variant-json/src/lib.rs
+++ b/parquet-variant-json/src/lib.rs
@@ -34,5 +34,5 @@
mod from_json;
mod to_json;
-pub use from_json::JsonToVariant;
+pub use from_json::{JsonToVariant, append_json};
pub use to_json::VariantToJson;
diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs
index 7094d935a5..e6122f062c 100644
--- a/parquet-variant/src/builder.rs
+++ b/parquet-variant/src/builder.rs
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-use crate::decoder::{VariantBasicType, VariantPrimitiveType};
+use crate::decoder::{OffsetSizeBytes, VariantBasicType, VariantPrimitiveType};
use crate::{
ShortString, Variant, VariantDecimal4, VariantDecimal8, VariantDecimal16,
VariantList,
VariantMetadata, VariantObject,
@@ -43,21 +43,15 @@ fn short_string_header(len: usize) -> u8 {
(len as u8) << 2 | VariantBasicType::ShortString as u8
}
-pub(crate) fn int_size(v: usize) -> u8 {
+pub(crate) fn int_size(v: usize) -> OffsetSizeBytes {
match v {
- 0..=0xFF => 1,
- 0x100..=0xFFFF => 2,
- 0x10000..=0xFFFFFF => 3,
- _ => 4,
+ 0..=0xFF => OffsetSizeBytes::One,
+ 0x100..=0xFFFF => OffsetSizeBytes::Two,
+ 0x10000..=0xFFFFFF => OffsetSizeBytes::Three,
+ _ => OffsetSizeBytes::Four,
}
}
-/// Write little-endian integer to buffer at a specific position
-fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes:
u8) {
- let bytes = value.to_le_bytes();
- buf[start_pos..start_pos + nbytes as
usize].copy_from_slice(&bytes[..nbytes as usize]);
-}
-
/// Wrapper around a `Vec<u8>` that provides methods for appending
/// primitive values, variant types, and metadata.
///
@@ -358,63 +352,6 @@ impl ValueBuilder {
);
state.finish();
}
-
- /// Writes out the header byte for a variant object or list, from the
starting position
- /// of the builder, will return the position after this write
- pub(crate) fn append_header_start_from_buf_pos(
- &mut self,
- start_pos: usize, // the start position where the header will be
inserted
- header_byte: u8,
- is_large: bool,
- num_fields: usize,
- ) -> usize {
- let buffer = self.inner_mut();
-
- // Write header at the original start position
- let mut header_pos = start_pos;
-
- // Write header byte
- buffer[header_pos] = header_byte;
- header_pos += 1;
-
- // Write number of fields
- if is_large {
- buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as
u32).to_le_bytes());
- header_pos += 4;
- } else {
- buffer[header_pos] = num_fields as u8;
- header_pos += 1;
- }
-
- header_pos
- }
-
- /// Writes out the offsets for an array of offsets, including the final
offset (data size).
- /// from the starting position of the buffer, will return the position
after this write
- pub(crate) fn append_offset_array_start_from_buf_pos(
- &mut self,
- start_pos: usize,
- offsets: impl IntoIterator<Item = usize>,
- data_size: Option<usize>,
- nbytes: u8,
- ) -> usize {
- let buf = self.inner_mut();
-
- let mut current_pos = start_pos;
- for relative_offset in offsets {
- write_offset_at_pos(buf, current_pos, relative_offset, nbytes);
- current_pos += nbytes as usize;
- }
-
- // Write data_size
- if let Some(data_size) = data_size {
- // Write data_size at the end of the offsets
- write_offset_at_pos(buf, current_pos, data_size, nbytes);
- current_pos += nbytes as usize;
- }
-
- current_pos
- }
}
/// A trait for managing state specific to different builder types.
diff --git a/parquet-variant/src/builder/list.rs
b/parquet-variant/src/builder/list.rs
index 4c2682c50a..5064904ca7 100644
--- a/parquet-variant/src/builder/list.rs
+++ b/parquet-variant/src/builder/list.rs
@@ -174,7 +174,7 @@ impl<'a, S: BuilderSpecificState> ListBuilder<'a, S> {
// Make sure to reserve enough capacity to handle the extra bytes
we'll truncate.
let mut bytes_to_splice = Vec::with_capacity(header_size + 3);
// Write header
- let header = array_header(is_large, offset_size);
+ let header = array_header(is_large, offset_size as _);
bytes_to_splice.push(header);
append_packed_u32(&mut bytes_to_splice, num_elements as u32,
num_elements_size);
diff --git a/parquet-variant/src/builder/metadata.rs
b/parquet-variant/src/builder/metadata.rs
index 10163ba3e0..efccc2e4c6 100644
--- a/parquet-variant/src/builder/metadata.rs
+++ b/parquet-variant/src/builder/metadata.rs
@@ -206,7 +206,7 @@ impl WritableMetadataBuilder {
// Determine appropriate offset size based on the larger of dict size
or total string size
let max_offset = std::cmp::max(total_dict_size, nkeys);
- let offset_size = int_size(max_offset);
+ let offset_size = int_size(max_offset) as u8;
let offset_start = 1 + offset_size as usize;
let string_start = offset_start + (nkeys + 1) * offset_size as usize;
diff --git a/parquet-variant/src/builder/object.rs
b/parquet-variant/src/builder/object.rs
index ab04360c16..876c2e2d4c 100644
--- a/parquet-variant/src/builder/object.rs
+++ b/parquet-variant/src/builder/object.rs
@@ -24,14 +24,50 @@ use crate::{
use arrow_schema::ArrowError;
use indexmap::IndexMap;
-fn object_header(large: bool, id_size: u8, offset_size: u8) -> u8 {
- let large_bit = if large { 1 } else { 0 };
- (large_bit << (BASIC_TYPE_BITS + 4))
- | ((id_size - 1) << (BASIC_TYPE_BITS + 2))
- | ((offset_size - 1) << BASIC_TYPE_BITS)
+fn object_header<const LARGE_BIT: u8, const ID_SIZE: u8, const OFFSET_SIZE:
u8>() -> u8 {
+ (LARGE_BIT << (BASIC_TYPE_BITS + 4))
+ | ((ID_SIZE - 1) << (BASIC_TYPE_BITS + 2))
+ | ((OFFSET_SIZE - 1) << BASIC_TYPE_BITS)
| VariantBasicType::Object as u8
}
+struct ObjectHeaderWriter<const OFFSET_SIZE: u8, const ID_SIZE: u8>();
+
+impl<const OFFSET_SIZE: u8, const ID_SIZE: u8> ObjectHeaderWriter<OFFSET_SIZE,
ID_SIZE> {
+ fn write(
+ dst: &mut Vec<u8>,
+ num_fields: usize,
+ field_ids: impl Iterator<Item = u32>,
+ offsets: impl Iterator<Item = usize>,
+ data_size: usize,
+ ) {
+ let is_large = num_fields > u8::MAX as usize;
+ // num_fields will consume 4 bytes when it is larger than u8::MAX
+ if is_large {
+ dst.push(object_header::<1, { ID_SIZE }, { OFFSET_SIZE }>());
+ append_packed_u32::<4>(dst, num_fields);
+ } else {
+ dst.push(object_header::<0, { ID_SIZE }, { OFFSET_SIZE }>());
+ append_packed_u32::<1>(dst, num_fields);
+ }
+
+ for id in field_ids {
+ append_packed_u32::<ID_SIZE>(dst, id as usize);
+ }
+
+ for off in offsets {
+ append_packed_u32::<OFFSET_SIZE>(dst, off);
+ }
+
+ append_packed_u32::<OFFSET_SIZE>(dst, data_size);
+ }
+}
+
+#[inline(always)]
+fn append_packed_u32<const SIZE: u8>(dest: &mut Vec<u8>, value: usize) {
+ dest.extend_from_slice(&value.to_le_bytes()[..SIZE as usize]);
+}
+
/// A builder for creating [`Variant::Object`] values.
///
/// See the examples on [`VariantBuilder`] for usage.
@@ -245,41 +281,45 @@ impl<'a, S: BuilderSpecificState> ObjectBuilder<'a, S> {
(num_fields * id_size as usize) + // field IDs
((num_fields + 1) * offset_size as usize); // field offsets +
data_size
+ let mut bytes_to_splice = Vec::with_capacity(header_size);
+
+ macro_rules! write_header {
+ ($offset_size:expr, $id_size:expr) => {
+ ObjectHeaderWriter::<{ $offset_size as u8 }, { $id_size as u8
}>::write(
+ &mut bytes_to_splice,
+ num_fields,
+ self.fields.keys().copied(),
+ self.fields.values().copied(),
+ data_size,
+ )
+ };
+ }
+
+ use crate::decoder::OffsetSizeBytes::*;
+ match (offset_size, id_size) {
+ (One, One) => write_header!(One, One),
+ (One, Two) => write_header!(One, Two),
+ (One, Three) => write_header!(One, Three),
+ (One, Four) => write_header!(One, Four),
+ (Two, One) => write_header!(Two, One),
+ (Two, Two) => write_header!(Two, Two),
+ (Two, Three) => write_header!(Two, Three),
+ (Two, Four) => write_header!(Two, Four),
+ (Three, One) => write_header!(Three, One),
+ (Three, Two) => write_header!(Three, Two),
+ (Three, Three) => write_header!(Three, Three),
+ (Three, Four) => write_header!(Three, Four),
+ (Four, One) => write_header!(Four, One),
+ (Four, Two) => write_header!(Four, Two),
+ (Four, Three) => write_header!(Four, Three),
+ (Four, Four) => write_header!(Four, Four),
+ }
+
// Shift existing data to make room for the header
- value_builder.inner_mut().splice(
- starting_offset..starting_offset,
- std::iter::repeat_n(0u8, header_size),
- );
+ value_builder
+ .inner_mut()
+ .splice(starting_offset..starting_offset, bytes_to_splice);
- // Write header at the original start position
- let mut header_pos = starting_offset;
-
- // Write header byte
- let header = object_header(is_large, id_size, offset_size);
-
- header_pos = self
- .parent_state
- .value_builder()
- .append_header_start_from_buf_pos(header_pos, header, is_large,
num_fields);
-
- header_pos = self
- .parent_state
- .value_builder()
- .append_offset_array_start_from_buf_pos(
- header_pos,
- self.fields.keys().copied().map(|id| id as usize),
- None,
- id_size,
- );
-
- self.parent_state
- .value_builder()
- .append_offset_array_start_from_buf_pos(
- header_pos,
- self.fields.values().copied(),
- Some(data_size),
- offset_size,
- );
self.parent_state.finish();
}
}