This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new cf8582ba61 Add builder to help create Schemas for shredding 
(`ShreddedSchemaBuilder`) (#8940)
cf8582ba61 is described below

commit cf8582ba6132db88b12d7c917454cc197b2b6c65
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Thu Dec 11 14:33:10 2025 -0500

    Add builder to help create Schemas for shredding (`ShreddedSchemaBuilder`) 
(#8940)
    
    - Closes #8922
    
    # Rationale for this change
    
    Basically a helper to simplify this:
    
    ```rust
    let shredding_type = ShredTypeBuilder::default()
        .with_path("a", &DataType::Int64)
        .with_path("b.c", &DataType::Utf8)
        .with_path("b.d", &DataType::Float64)
        .build();
    
    assert_eq!(
        shredding_type,
        DataType::Struct(Fields::from(vec![
            Field::new("a", DataType::Int64, true),
            Field::new(
                "b",
                DataType::Struct(Fields::from(vec![
                    Field::new("c", DataType::Utf8, true),
                    Field::new("d", DataType::Float64, true),
                ])),
                true
            ),
        ]))
    );
    ```
    
    # What changes are included in this PR?
    
    1. Added `ShredTypeBuilder`
    2. Updated existing tests cases to use this new primitive
    
    # Are these changes tested?
    
    Yes
    
    # Are there any user-facing changes?
    Add a new public interface
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet-variant-compute/src/lib.rs           |   2 +-
 parquet-variant-compute/src/shred_variant.rs | 494 +++++++++++++++++++++++++--
 parquet-variant/src/path.rs                  |  12 +-
 3 files changed, 483 insertions(+), 25 deletions(-)

diff --git a/parquet-variant-compute/src/lib.rs 
b/parquet-variant-compute/src/lib.rs
index f529c27a8f..9b8008f584 100644
--- a/parquet-variant-compute/src/lib.rs
+++ b/parquet-variant-compute/src/lib.rs
@@ -56,7 +56,7 @@ pub use variant_array_builder::{VariantArrayBuilder, 
VariantValueArrayBuilder};
 
 pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options};
 pub use from_json::json_to_variant;
-pub use shred_variant::shred_variant;
+pub use shred_variant::{IntoShreddingField, ShreddedSchemaBuilder, 
shred_variant};
 pub use to_json::variant_to_json;
 pub use type_conversion::CastOptions;
 pub use unshred_variant::unshred_variant;
diff --git a/parquet-variant-compute/src/shred_variant.rs 
b/parquet-variant-compute/src/shred_variant.rs
index 51306ebd16..7b8dc28d5f 100644
--- a/parquet-variant-compute/src/shred_variant.rs
+++ b/parquet-variant-compute/src/shred_variant.rs
@@ -25,11 +25,12 @@ use crate::{VariantArray, VariantValueArrayBuilder};
 use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder};
 use arrow::buffer::NullBuffer;
 use arrow::compute::CastOptions;
-use arrow::datatypes::{DataType, Fields, TimeUnit};
+use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
 use arrow::error::{ArrowError, Result};
-use parquet_variant::{Variant, VariantBuilderExt};
+use parquet_variant::{Variant, VariantBuilderExt, VariantPath, 
VariantPathElement};
 
 use indexmap::IndexMap;
+use std::collections::BTreeMap;
 use std::sync::Arc;
 
 /// Shreds the input binary variant using a target shredding schema derived 
from the requested data type.
@@ -63,6 +64,9 @@ use std::sync::Arc;
 ///   }
 /// }
 /// ```
+///
+/// See [`ShreddedSchemaBuilder`] for a convenient way to build the `as_type`
+/// value passed to this function.
 pub fn shred_variant(array: &VariantArray, as_type: &DataType) -> 
Result<VariantArray> {
     if array.typed_value_field().is_some() {
         return Err(ArrowError::InvalidArgumentError(
@@ -348,13 +352,238 @@ impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> {
     }
 }
 
+/// Field configuration captured by the builder (data type + nullability).
+#[derive(Clone)]
+pub struct ShreddingField {
+    data_type: DataType,
+    nullable: bool,
+}
+
+impl ShreddingField {
+    fn new(data_type: DataType, nullable: bool) -> Self {
+        Self {
+            data_type,
+            nullable,
+        }
+    }
+
+    fn null() -> Self {
+        Self::new(DataType::Null, true)
+    }
+}
+
+/// Convenience conversion to allow passing either `FieldRef`, `DataType`, or 
`(DataType, bool)`.
+pub trait IntoShreddingField {
+    fn into_shredding_field(self) -> ShreddingField;
+}
+
+impl IntoShreddingField for FieldRef {
+    fn into_shredding_field(self) -> ShreddingField {
+        ShreddingField::new(self.data_type().clone(), self.is_nullable())
+    }
+}
+
+impl IntoShreddingField for &DataType {
+    fn into_shredding_field(self) -> ShreddingField {
+        ShreddingField::new(self.clone(), true)
+    }
+}
+
+impl IntoShreddingField for DataType {
+    fn into_shredding_field(self) -> ShreddingField {
+        ShreddingField::new(self, true)
+    }
+}
+
+impl IntoShreddingField for (&DataType, bool) {
+    fn into_shredding_field(self) -> ShreddingField {
+        ShreddingField::new(self.0.clone(), self.1)
+    }
+}
+
+impl IntoShreddingField for (DataType, bool) {
+    fn into_shredding_field(self) -> ShreddingField {
+        ShreddingField::new(self.0, self.1)
+    }
+}
+
+/// Builder for constructing a variant shredding schema.
+///
+/// The builder pattern makes it easy to incrementally define which fields
+/// should be shredded and with what types. Fields are nullable by default; 
pass
+/// a `(data_type, nullable)` pair or a `FieldRef` to control nullability.
+///
+/// Note: this builder currently only supports struct fields. List support
+/// will be added in the future.
+///
+/// # Example
+///
+/// ```
+/// use std::sync::Arc;
+/// use arrow::datatypes::{DataType, Field, TimeUnit};
+/// use parquet_variant::{VariantPath, VariantPathElement};
+/// use parquet_variant_compute::ShreddedSchemaBuilder;
+///
+/// // Define the shredding schema using the builder
+/// let shredding_type = ShreddedSchemaBuilder::default()
+///     // store the "time" field as a separate UTC timestamp
+///     .with_path("time", (&DataType::Timestamp(TimeUnit::Nanosecond, 
Some("UTC".into())), true))
+///     // store hostname as non-nullable Utf8
+///     .with_path("hostname", (&DataType::Utf8, false))
+///     // pass a FieldRef directly
+///     .with_path(
+///         "metadata.trace_id",
+///         Arc::new(Field::new("trace_id", DataType::FixedSizeBinary(16), 
false)),
+///     )
+///     // field name with a dot: use VariantPath to avoid splitting
+///     .with_path(
+///         VariantPath::from_iter([VariantPathElement::from("metrics.cpu")]),
+///         &DataType::Float64,
+///     )
+///     .build();
+///
+/// // The shredding_type can now be passed to shred_variant:
+/// // let shredded = shred_variant(&input, &shredding_type)?;
+/// ```
+#[derive(Default, Clone)]
+pub struct ShreddedSchemaBuilder {
+    root: VariantSchemaNode,
+}
+
+impl ShreddedSchemaBuilder {
+    /// Create a new empty schema builder.
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Insert a typed path into the schema using dot notation (or any
+    /// [`VariantPath`] convertible).
+    ///
+    /// The path uses dot notation to specify nested fields.
+    /// For example, "a.b.c" will create a nested structure.
+    ///
+    /// # Arguments
+    ///
+    /// * `path` - Anything convertible to [`VariantPath`] (e.g., a `&str`)
+    /// * `field` - Anything convertible via [`IntoShreddingField`] (e.g. 
`FieldRef`,
+    ///   `&DataType`, or `(&DataType, bool)` to control nullability)
+    pub fn with_path<'a, P, F>(mut self, path: P, field: F) -> Self
+    where
+        P: Into<VariantPath<'a>>,
+        F: IntoShreddingField,
+    {
+        let path: VariantPath<'a> = path.into();
+        self.root.insert_path(&path, field.into_shredding_field());
+        self
+    }
+
+    /// Build the final [`DataType`].
+    pub fn build(self) -> DataType {
+        let shredding_type = self.root.to_shredding_type();
+        match shredding_type {
+            Some(shredding_type) => shredding_type,
+            None => DataType::Null,
+        }
+    }
+}
+
+/// Internal tree node structure for building variant schemas.
+#[derive(Clone)]
+enum VariantSchemaNode {
+    /// A leaf node with a primitive/scalar type (and nullability)
+    Leaf(ShreddingField),
+    /// An inner struct node with nested fields
+    Struct(BTreeMap<String, VariantSchemaNode>),
+}
+
+impl Default for VariantSchemaNode {
+    fn default() -> Self {
+        Self::Leaf(ShreddingField::null())
+    }
+}
+
+impl VariantSchemaNode {
+    /// Insert a path into this node with the given data type.
+    fn insert_path(&mut self, path: &VariantPath<'_>, field: ShreddingField) {
+        self.insert_path_elements(path, field);
+    }
+
+    fn insert_path_elements(&mut self, segments: &[VariantPathElement<'_>], 
field: ShreddingField) {
+        let Some((head, tail)) = segments.split_first() else {
+            *self = Self::Leaf(field);
+            return;
+        };
+
+        match head {
+            VariantPathElement::Field { name } => {
+                // Ensure this node is a Struct node
+                let children = match self {
+                    Self::Struct(children) => children,
+                    _ => {
+                        *self = Self::Struct(BTreeMap::new());
+                        match self {
+                            Self::Struct(children) => children,
+                            _ => unreachable!(),
+                        }
+                    }
+                };
+
+                children
+                    .entry(name.to_string())
+                    .or_default()
+                    .insert_path_elements(tail, field);
+            }
+            VariantPathElement::Index { .. } => {
+                // List support to be added later; reject for now
+                unreachable!("List paths are not supported yet");
+            }
+        }
+    }
+
+    /// Convert this node to a shredding type.
+    ///
+    /// Returns the [`DataType`] for passing to [`shred_variant`].
+    fn to_shredding_type(&self) -> Option<DataType> {
+        match self {
+            Self::Leaf(field) => Some(field.data_type.clone()),
+            Self::Struct(children) => {
+                let child_fields: Vec<_> = children
+                    .iter()
+                    .filter_map(|(name, child)| child.to_shredding_field(name))
+                    .collect();
+                if child_fields.is_empty() {
+                    None
+                } else {
+                    Some(DataType::Struct(Fields::from(child_fields)))
+                }
+            }
+        }
+    }
+
+    fn to_shredding_field(&self, name: &str) -> Option<FieldRef> {
+        match self {
+            Self::Leaf(field) => Some(Arc::new(Field::new(
+                name,
+                field.data_type.clone(),
+                field.nullable,
+            ))),
+            Self::Struct(_) => self
+                .to_shredding_type()
+                .map(|data_type| Arc::new(Field::new(name, data_type, true))),
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
     use crate::VariantArrayBuilder;
     use arrow::array::{Array, FixedSizeBinaryArray, Float64Array, Int64Array};
     use arrow::datatypes::{DataType, Field, Fields, TimeUnit, UnionFields, 
UnionMode};
-    use parquet_variant::{ObjectBuilder, ReadOnlyMetadataBuilder, Variant, 
VariantBuilder};
+    use parquet_variant::{
+        ObjectBuilder, ReadOnlyMetadataBuilder, Variant, VariantBuilder, 
VariantPath,
+        VariantPathElement,
+    };
     use std::sync::Arc;
     use uuid::Uuid;
 
@@ -668,11 +897,10 @@ mod tests {
 
         // Create target schema: struct<score: float64, age: int64>
         // Both types are supported for shredding
-        let fields = Fields::from(vec![
-            Field::new("score", DataType::Float64, true),
-            Field::new("age", DataType::Int64, true),
-        ]);
-        let target_schema = DataType::Struct(fields);
+        let target_schema = ShreddedSchemaBuilder::default()
+            .with_path("score", &DataType::Float64)
+            .with_path("age", &DataType::Int64)
+            .build();
 
         let result = shred_variant(&input, &target_schema).unwrap();
 
@@ -992,26 +1220,28 @@ mod tests {
         let input = builder.build();
 
         // Test with schema containing only id field
-        let schema1 = DataType::Struct(Fields::from(vec![Field::new("id", 
DataType::Int32, true)]));
+        let schema1 = ShreddedSchemaBuilder::default()
+            .with_path("id", &DataType::Int32)
+            .build();
         let result1 = shred_variant(&input, &schema1).unwrap();
         let value_field1 = result1.value_field().unwrap();
         assert!(!value_field1.is_null(0)); // should contain {"age": 25, 
"score": 95.5}
 
         // Test with schema containing id and age fields
-        let schema2 = DataType::Struct(Fields::from(vec![
-            Field::new("id", DataType::Int32, true),
-            Field::new("age", DataType::Int64, true),
-        ]));
+        let schema2 = ShreddedSchemaBuilder::default()
+            .with_path("id", &DataType::Int32)
+            .with_path("age", &DataType::Int64)
+            .build();
         let result2 = shred_variant(&input, &schema2).unwrap();
         let value_field2 = result2.value_field().unwrap();
         assert!(!value_field2.is_null(0)); // should contain {"score": 95.5}
 
         // Test with schema containing all fields
-        let schema3 = DataType::Struct(Fields::from(vec![
-            Field::new("id", DataType::Int32, true),
-            Field::new("age", DataType::Int64, true),
-            Field::new("score", DataType::Float64, true),
-        ]));
+        let schema3 = ShreddedSchemaBuilder::default()
+            .with_path("id", &DataType::Int32)
+            .with_path("age", &DataType::Int64)
+            .with_path("score", &DataType::Float64)
+            .build();
         let result3 = shred_variant(&input, &schema3).unwrap();
         let value_field3 = result3.value_field().unwrap();
         assert!(value_field3.is_null(0)); // fully shredded, no remaining 
fields
@@ -1062,11 +1292,10 @@ mod tests {
 
         let input = builder.build();
 
-        let fields = Fields::from(vec![
-            Field::new("id", DataType::FixedSizeBinary(16), true),
-            Field::new("session_id", DataType::FixedSizeBinary(16), true),
-        ]);
-        let target_schema = DataType::Struct(fields);
+        let target_schema = ShreddedSchemaBuilder::default()
+            .with_path("id", DataType::FixedSizeBinary(16))
+            .with_path("session_id", DataType::FixedSizeBinary(16))
+            .build();
 
         let result = shred_variant(&input, &target_schema).unwrap();
 
@@ -1244,4 +1473,223 @@ mod tests {
             }
         }
     }
+
+    #[test]
+    fn test_variant_schema_builder_simple() {
+        let shredding_type = ShreddedSchemaBuilder::default()
+            .with_path("a", &DataType::Int64)
+            .with_path("b", &DataType::Float64)
+            .build();
+
+        assert_eq!(
+            shredding_type,
+            DataType::Struct(Fields::from(vec![
+                Field::new("a", DataType::Int64, true),
+                Field::new("b", DataType::Float64, true),
+            ]))
+        );
+    }
+
+    #[test]
+    fn test_variant_schema_builder_nested() {
+        let shredding_type = ShreddedSchemaBuilder::default()
+            .with_path("a", &DataType::Int64)
+            .with_path("b.c", &DataType::Utf8)
+            .with_path("b.d", &DataType::Float64)
+            .build();
+
+        assert_eq!(
+            shredding_type,
+            DataType::Struct(Fields::from(vec![
+                Field::new("a", DataType::Int64, true),
+                Field::new(
+                    "b",
+                    DataType::Struct(Fields::from(vec![
+                        Field::new("c", DataType::Utf8, true),
+                        Field::new("d", DataType::Float64, true),
+                    ])),
+                    true
+                ),
+            ]))
+        );
+    }
+
+    #[test]
+    fn test_variant_schema_builder_with_path_variant_path_arg() {
+        let path = VariantPath::from_iter([VariantPathElement::from("a.b")]);
+        let shredding_type = ShreddedSchemaBuilder::default()
+            .with_path(path, &DataType::Int64)
+            .build();
+
+        match shredding_type {
+            DataType::Struct(fields) => {
+                assert_eq!(fields.len(), 1);
+                assert_eq!(fields[0].name(), "a.b");
+                assert_eq!(fields[0].data_type(), &DataType::Int64);
+            }
+            _ => panic!("expected struct data type"),
+        }
+    }
+
+    #[test]
+    fn test_variant_schema_builder_custom_nullability() {
+        let shredding_type = ShreddedSchemaBuilder::default()
+            .with_path(
+                "foo",
+                Arc::new(Field::new("should_be_renamed", DataType::Utf8, 
false)),
+            )
+            .with_path("bar", (&DataType::Int64, false))
+            .build();
+
+        let DataType::Struct(fields) = shredding_type else {
+            panic!("expected struct data type");
+        };
+
+        let foo = fields.iter().find(|f| f.name() == "foo").unwrap();
+        assert_eq!(foo.data_type(), &DataType::Utf8);
+        assert!(!foo.is_nullable());
+
+        let bar = fields.iter().find(|f| f.name() == "bar").unwrap();
+        assert_eq!(bar.data_type(), &DataType::Int64);
+        assert!(!bar.is_nullable());
+    }
+
+    #[test]
+    fn test_variant_schema_builder_with_shred_variant() {
+        let mut builder = VariantArrayBuilder::new(3);
+        builder
+            .new_object()
+            .with_field("time", 1234567890i64)
+            .with_field("hostname", "server1")
+            .with_field("extra", 42)
+            .finish();
+        builder
+            .new_object()
+            .with_field("time", 9876543210i64)
+            .with_field("hostname", "server2")
+            .finish();
+        builder.append_null();
+
+        let input = builder.build();
+
+        let shredding_type = ShreddedSchemaBuilder::default()
+            .with_path("time", &DataType::Int64)
+            .with_path("hostname", &DataType::Utf8)
+            .build();
+
+        let result = shred_variant(&input, &shredding_type).unwrap();
+
+        assert_eq!(
+            result.data_type(),
+            &DataType::Struct(Fields::from(vec![
+                Field::new("metadata", DataType::BinaryView, false),
+                Field::new("value", DataType::BinaryView, true),
+                Field::new(
+                    "typed_value",
+                    DataType::Struct(Fields::from(vec![
+                        Field::new(
+                            "hostname",
+                            DataType::Struct(Fields::from(vec![
+                                Field::new("value", DataType::BinaryView, 
true),
+                                Field::new("typed_value", DataType::Utf8, 
true),
+                            ])),
+                            false,
+                        ),
+                        Field::new(
+                            "time",
+                            DataType::Struct(Fields::from(vec![
+                                Field::new("value", DataType::BinaryView, 
true),
+                                Field::new("typed_value", DataType::Int64, 
true),
+                            ])),
+                            false,
+                        ),
+                    ])),
+                    true,
+                ),
+            ]))
+        );
+
+        assert_eq!(result.len(), 3);
+        assert!(result.typed_value_field().is_some());
+
+        let typed_value = result
+            .typed_value_field()
+            .unwrap()
+            .as_any()
+            .downcast_ref::<arrow::array::StructArray>()
+            .unwrap();
+
+        let time_field =
+            
ShreddedVariantFieldArray::try_new(typed_value.column_by_name("time").unwrap())
+                .unwrap();
+        let hostname_field =
+            
ShreddedVariantFieldArray::try_new(typed_value.column_by_name("hostname").unwrap())
+                .unwrap();
+
+        let time_typed = time_field
+            .typed_value_field()
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        let hostname_typed = hostname_field
+            .typed_value_field()
+            .unwrap()
+            .as_any()
+            .downcast_ref::<arrow::array::StringArray>()
+            .unwrap();
+
+        // Row 0
+        assert!(!result.is_null(0));
+        assert_eq!(time_typed.value(0), 1234567890);
+        assert_eq!(hostname_typed.value(0), "server1");
+
+        // Row 1
+        assert!(!result.is_null(1));
+        assert_eq!(time_typed.value(1), 9876543210);
+        assert_eq!(hostname_typed.value(1), "server2");
+
+        // Row 2
+        assert!(result.is_null(2));
+    }
+
+    #[test]
+    fn test_variant_schema_builder_conflicting_path() {
+        let shredding_type = ShreddedSchemaBuilder::default()
+            .with_path("a", &DataType::Int64)
+            .with_path("a", &DataType::Float64)
+            .build();
+
+        assert_eq!(
+            shredding_type,
+            DataType::Struct(Fields::from(
+                vec![Field::new("a", DataType::Float64, true),]
+            ))
+        );
+    }
+
+    #[test]
+    fn test_variant_schema_builder_root_path() {
+        let path = VariantPath::new(vec![]);
+        let shredding_type = ShreddedSchemaBuilder::default()
+            .with_path(path, &DataType::Int64)
+            .build();
+
+        assert_eq!(shredding_type, DataType::Int64);
+    }
+
+    #[test]
+    fn test_variant_schema_builder_empty_path() {
+        let shredding_type = ShreddedSchemaBuilder::default()
+            .with_path("", &DataType::Int64)
+            .build();
+
+        assert_eq!(shredding_type, DataType::Int64);
+    }
+
+    #[test]
+    fn test_variant_schema_builder_default() {
+        let shredding_type = ShreddedSchemaBuilder::default().build();
+        assert_eq!(shredding_type, DataType::Null);
+    }
 }
diff --git a/parquet-variant/src/path.rs b/parquet-variant/src/path.rs
index a7010ba61c..e222c3ac9c 100644
--- a/parquet-variant/src/path.rs
+++ b/parquet-variant/src/path.rs
@@ -112,7 +112,11 @@ impl<'a> From<Vec<VariantPathElement<'a>>> for 
VariantPath<'a> {
 /// Create from &str with support for dot notation
 impl<'a> From<&'a str> for VariantPath<'a> {
     fn from(path: &'a str) -> Self {
-        VariantPath::new(path.split('.').map(Into::into).collect())
+        if path.is_empty() {
+            VariantPath::new(vec![])
+        } else {
+            VariantPath::new(path.split('.').map(Into::into).collect())
+        }
     }
 }
 
@@ -207,6 +211,12 @@ mod tests {
         assert!(path.is_empty());
     }
 
+    #[test]
+    fn test_variant_path_empty_str() {
+        let path = VariantPath::from("");
+        assert!(path.is_empty());
+    }
+
     #[test]
     fn test_variant_path_non_empty() {
         let p = VariantPathElement::from("a");

Reply via email to