This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 7276819d0d Split out variant code into several new sub-modules (#7717)
7276819d0d is described below

commit 7276819d0d6b2357ae57704af98e2025cddbef70
Author: Ryan Johnson <[email protected]>
AuthorDate: Fri Jun 20 06:21:47 2025 -0700

    Split out variant code into several new sub-modules (#7717)
    
    # Which issue does this PR close?
    
    Housekeeping, part of
    * https://github.com/apache/arrow-rs/issues/6736
    
    # Rationale for this change
    
    The variant module was starting to become unwieldy.
    
    # What changes are included in this PR?
    
    Split out metadata, object, and list to sub-modules; move `OffsetSize`
    to the decoder module where it arguably belongs.
    
    Result: variant.rs is "only" ~900 LoC instead of ~2kLoc.
    
    # Are there any user-facing changes?
    
    No. Public re-exports should hide the change from users.
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet-variant/src/decoder.rs          | 182 +++++-
 parquet-variant/src/variant.rs          | 997 +-------------------------------
 parquet-variant/src/variant/list.rs     | 297 ++++++++++
 parquet-variant/src/variant/metadata.rs | 287 +++++++++
 parquet-variant/src/variant/object.rs   | 311 ++++++++++
 5 files changed, 1080 insertions(+), 994 deletions(-)

diff --git a/parquet-variant/src/decoder.rs b/parquet-variant/src/decoder.rs
index c4ab80091b..7fb41c7da2 100644
--- a/parquet-variant/src/decoder.rs
+++ b/parquet-variant/src/decoder.rs
@@ -14,11 +14,13 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+use crate::utils::{array_from_slice, slice_from_slice, string_from_slice};
+
 use arrow_schema::ArrowError;
 use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
-use std::array::TryFromSliceError;
 
-use crate::utils::{array_from_slice, slice_from_slice, string_from_slice};
+use std::array::TryFromSliceError;
+use std::num::TryFromIntError;
 
 #[derive(Debug, Clone, Copy, PartialEq)]
 pub enum VariantBasicType {
@@ -50,10 +52,10 @@ pub enum VariantPrimitiveType {
 }
 
 /// Extracts the basic type from a header byte
-pub(crate) fn get_basic_type(header: u8) -> Result<VariantBasicType, 
ArrowError> {
+pub(crate) fn get_basic_type(header: u8) -> VariantBasicType {
     // See 
https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#value-encoding
     let basic_type = header & 0x03; // Basic type is encoded in the first 2 
bits
-    let basic_type = match basic_type {
+    match basic_type {
         0 => VariantBasicType::Primitive,
         1 => VariantBasicType::ShortString,
         2 => VariantBasicType::Object,
@@ -63,8 +65,7 @@ pub(crate) fn get_basic_type(header: u8) -> 
Result<VariantBasicType, ArrowError>
             // masked `basic_type` with 0x03 above.
             unreachable!();
         }
-    };
-    Ok(basic_type)
+    }
 }
 
 impl TryFrom<u8> for VariantPrimitiveType {
@@ -96,6 +97,76 @@ impl TryFrom<u8> for VariantPrimitiveType {
         }
     }
 }
+
+/// Used to unpack offset array entries such as metadata dictionary offsets or 
object/array value
+/// offsets. Also used to unpack object field ids. These are always derived 
from a two-bit
+/// `XXX_size_minus_one` field in the corresponding header byte.
+#[derive(Clone, Debug, Copy, PartialEq)]
+pub(crate) enum OffsetSizeBytes {
+    One = 1,
+    Two = 2,
+    Three = 3,
+    Four = 4,
+}
+
+impl OffsetSizeBytes {
+    /// Build from the `offset_size_minus_one` bits (see spec).
+    pub(crate) fn try_new(offset_size_minus_one: u8) -> Result<Self, 
ArrowError> {
+        use OffsetSizeBytes::*;
+        let result = match offset_size_minus_one {
+            0 => One,
+            1 => Two,
+            2 => Three,
+            3 => Four,
+            _ => {
+                return Err(ArrowError::InvalidArgumentError(
+                    "offset_size_minus_one must be 0–3".to_string(),
+                ))
+            }
+        };
+        Ok(result)
+    }
+
+    /// Return one unsigned little-endian value from `bytes`.
+    ///
+    /// * `bytes` – the Variant-metadata buffer.
+    /// * `byte_offset` – number of bytes to skip **before** reading the first
+    ///   value (usually `1` to move past the header byte).
+    /// * `offset_index` – 0-based index **after** the skip
+    ///   (`0` is the first value, `1` the next, …).
+    ///
+    /// Each value is `self as usize` bytes wide (1, 2, 3 or 4).
+    /// Three-byte values are zero-extended to 32 bits before the final
+    /// fallible cast to `usize`.
+    pub(crate) fn unpack_usize(
+        &self,
+        bytes: &[u8],
+        byte_offset: usize,  // how many bytes to skip
+        offset_index: usize, // which offset in an array of offsets
+    ) -> Result<usize, ArrowError> {
+        use OffsetSizeBytes::*;
+        let offset = byte_offset + (*self as usize) * offset_index;
+        let result = match self {
+            One => u8::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            Two => u16::from_le_bytes(array_from_slice(bytes, offset)?).into(),
+            Three => {
+                // Let's grab the three byte le-chunk first
+                let b3_chunks: [u8; 3] = array_from_slice(bytes, offset)?;
+                // Let's pad it and construct a padded u32 from it.
+                let mut buf = [0u8; 4];
+                buf[..3].copy_from_slice(&b3_chunks);
+                u32::from_le_bytes(buf)
+                    .try_into()
+                    .map_err(|e: TryFromIntError| 
ArrowError::InvalidArgumentError(e.to_string()))?
+            }
+            Four => u32::from_le_bytes(array_from_slice(bytes, offset)?)
+                .try_into()
+                .map_err(|e: TryFromIntError| 
ArrowError::InvalidArgumentError(e.to_string()))?,
+        };
+        Ok(result)
+    }
+}
+
 /// Extract the primitive type from a Variant value-metadata byte
 pub(crate) fn get_primitive_type(metadata: u8) -> Result<VariantPrimitiveType, 
ArrowError> {
     // last 6 bits contain the primitive-type, see spec
@@ -363,4 +434,103 @@ mod tests {
         assert_eq!(result, "Hello");
         Ok(())
     }
+
+    #[test]
+    fn test_offset() {
+        assert_eq!(OffsetSizeBytes::try_new(0).unwrap(), OffsetSizeBytes::One);
+        assert_eq!(OffsetSizeBytes::try_new(1).unwrap(), OffsetSizeBytes::Two);
+        assert_eq!(OffsetSizeBytes::try_new(2).unwrap(), 
OffsetSizeBytes::Three);
+        assert_eq!(OffsetSizeBytes::try_new(3).unwrap(), 
OffsetSizeBytes::Four);
+
+        // everything outside 0-3 must error
+        assert!(OffsetSizeBytes::try_new(4).is_err());
+        assert!(OffsetSizeBytes::try_new(255).is_err());
+    }
+
+    #[test]
+    fn unpack_usize_all_widths() {
+        // One-byte offsets
+        let buf_one = [0x01u8, 0xAB, 0xCD];
+        assert_eq!(
+            OffsetSizeBytes::One.unpack_usize(&buf_one, 0, 0).unwrap(),
+            0x01
+        );
+        assert_eq!(
+            OffsetSizeBytes::One.unpack_usize(&buf_one, 0, 2).unwrap(),
+            0xCD
+        );
+
+        // Two-byte offsets (little-endian 0x1234, 0x5678)
+        let buf_two = [0x34, 0x12, 0x78, 0x56];
+        assert_eq!(
+            OffsetSizeBytes::Two.unpack_usize(&buf_two, 0, 0).unwrap(),
+            0x1234
+        );
+        assert_eq!(
+            OffsetSizeBytes::Two.unpack_usize(&buf_two, 0, 1).unwrap(),
+            0x5678
+        );
+
+        // Three-byte offsets (0x030201 and 0x0000FF)
+        let buf_three = [0x01, 0x02, 0x03, 0xFF, 0x00, 0x00];
+        assert_eq!(
+            OffsetSizeBytes::Three
+                .unpack_usize(&buf_three, 0, 0)
+                .unwrap(),
+            0x030201
+        );
+        assert_eq!(
+            OffsetSizeBytes::Three
+                .unpack_usize(&buf_three, 0, 1)
+                .unwrap(),
+            0x0000FF
+        );
+
+        // Four-byte offsets (0x12345678, 0x90ABCDEF)
+        let buf_four = [0x78, 0x56, 0x34, 0x12, 0xEF, 0xCD, 0xAB, 0x90];
+        assert_eq!(
+            OffsetSizeBytes::Four.unpack_usize(&buf_four, 0, 0).unwrap(),
+            0x1234_5678
+        );
+        assert_eq!(
+            OffsetSizeBytes::Four.unpack_usize(&buf_four, 0, 1).unwrap(),
+            0x90AB_CDEF
+        );
+    }
+
+    #[test]
+    fn unpack_usize_out_of_bounds() {
+        let tiny = [0x00u8]; // deliberately too short
+        assert!(OffsetSizeBytes::Two.unpack_usize(&tiny, 0, 0).is_err());
+        assert!(OffsetSizeBytes::Three.unpack_usize(&tiny, 0, 0).is_err());
+    }
+
+    #[test]
+    fn unpack_simple() {
+        let buf = [
+            0x41, // header
+            0x02, 0x00, // dictionary_size = 2
+            0x00, 0x00, // offset[0] = 0
+            0x05, 0x00, // offset[1] = 5
+            0x09, 0x00, // offset[2] = 9
+        ];
+
+        let width = OffsetSizeBytes::Two;
+
+        // dictionary_size starts immediately after the header
+        let dict_size = width.unpack_usize(&buf, 1, 0).unwrap();
+        assert_eq!(dict_size, 2);
+
+        let first = width.unpack_usize(&buf, 1, 1).unwrap();
+        assert_eq!(first, 0);
+
+        let second = width.unpack_usize(&buf, 1, 2).unwrap();
+        assert_eq!(second, 5);
+
+        let third = width.unpack_usize(&buf, 1, 3).unwrap();
+        assert_eq!(third, 9);
+
+        let err = width.unpack_usize(&buf, 1, 4);
+        assert!(err.is_err())
+    }
 }
diff --git a/parquet-variant/src/variant.rs b/parquet-variant/src/variant.rs
index d55591f766..843fe2048c 100644
--- a/parquet-variant/src/variant.rs
+++ b/parquet-variant/src/variant.rs
@@ -14,535 +14,20 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+pub use self::list::VariantList;
+pub use self::metadata::VariantMetadata;
+pub use self::object::VariantObject;
 use crate::decoder::{
     self, get_basic_type, get_primitive_type, VariantBasicType, 
VariantPrimitiveType,
 };
-use crate::utils::{
-    array_from_slice, first_byte_from_slice, slice_from_slice, 
string_from_slice,
-    try_binary_search_range_by, validate_fallible_iterator,
-};
+use crate::utils::{first_byte_from_slice, slice_from_slice};
+
 use arrow_schema::ArrowError;
 use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
-use std::num::TryFromIntError;
-
-/// The number of bytes used to store offsets in the [`VariantMetadataHeader`]
-#[derive(Clone, Debug, Copy, PartialEq)]
-enum OffsetSizeBytes {
-    One = 1,
-    Two = 2,
-    Three = 3,
-    Four = 4,
-}
-
-impl OffsetSizeBytes {
-    /// Build from the `offset_size_minus_one` bits (see spec).
-    fn try_new(offset_size_minus_one: u8) -> Result<Self, ArrowError> {
-        use OffsetSizeBytes::*;
-        let result = match offset_size_minus_one {
-            0 => One,
-            1 => Two,
-            2 => Three,
-            3 => Four,
-            _ => {
-                return Err(ArrowError::InvalidArgumentError(
-                    "offset_size_minus_one must be 0–3".to_string(),
-                ))
-            }
-        };
-        Ok(result)
-    }
-
-    /// Return one unsigned little-endian value from `bytes`.
-    ///
-    /// * `bytes` – the Variant-metadata buffer.
-    /// * `byte_offset` – number of bytes to skip **before** reading the first
-    ///   value (usually `1` to move past the header byte).
-    /// * `offset_index` – 0-based index **after** the skip
-    ///   (`0` is the first value, `1` the next, …).
-    ///
-    /// Each value is `self as usize` bytes wide (1, 2, 3 or 4).
-    /// Three-byte values are zero-extended to 32 bits before the final
-    /// fallible cast to `usize`.
-    fn unpack_usize(
-        &self,
-        bytes: &[u8],
-        byte_offset: usize,  // how many bytes to skip
-        offset_index: usize, // which offset in an array of offsets
-    ) -> Result<usize, ArrowError> {
-        use OffsetSizeBytes::*;
-        let offset = byte_offset + (*self as usize) * offset_index;
-        let result = match self {
-            One => u8::from_le_bytes(array_from_slice(bytes, offset)?).into(),
-            Two => u16::from_le_bytes(array_from_slice(bytes, offset)?).into(),
-            Three => {
-                // Let's grab the three byte le-chunk first
-                let b3_chunks: [u8; 3] = array_from_slice(bytes, offset)?;
-                // Let's pad it and construct a padded u32 from it.
-                let mut buf = [0u8; 4];
-                buf[..3].copy_from_slice(&b3_chunks);
-                u32::from_le_bytes(buf)
-                    .try_into()
-                    .map_err(|e: TryFromIntError| 
ArrowError::InvalidArgumentError(e.to_string()))?
-            }
-            Four => u32::from_le_bytes(array_from_slice(bytes, offset)?)
-                .try_into()
-                .map_err(|e: TryFromIntError| 
ArrowError::InvalidArgumentError(e.to_string()))?,
-        };
-        Ok(result)
-    }
-}
-
-/// Header structure for [`VariantMetadata`]
-#[derive(Clone, Debug, Copy, PartialEq)]
-pub(crate) struct VariantMetadataHeader {
-    version: u8,
-    is_sorted: bool,
-    /// Note: This is `offset_size_minus_one` + 1
-    offset_size: OffsetSizeBytes,
-}
-
-// According to the spec this is currently always = 1, and so we store this 
const for validation
-// purposes and to make that visible.
-const CORRECT_VERSION_VALUE: u8 = 1;
-
-impl VariantMetadataHeader {
-    /// Tries to construct the variant metadata header, which has the form
-    ///
-    /// ```text
-    ///              7     6  5   4  3             0
-    ///             +-------+---+---+---------------+
-    /// header      |       |   |   |    version    |
-    ///             +-------+---+---+---------------+
-    ///                 ^         ^
-    ///                 |         +-- sorted_strings
-    ///                 +-- offset_size_minus_one
-    /// ```
-    ///
-    /// The version is a 4-bit value that must always contain the value 1.
-    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
-    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
-    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
-    pub(crate) fn try_new(header_byte: u8) -> Result<Self, ArrowError> {
-        let version = header_byte & 0x0F; // First four bits
-        if version != CORRECT_VERSION_VALUE {
-            let err_msg = format!(
-                "The version bytes in the header is not 
{CORRECT_VERSION_VALUE}, got {:b}",
-                version
-            );
-            return Err(ArrowError::InvalidArgumentError(err_msg));
-        }
-        let is_sorted = (header_byte & 0x10) != 0; // Fifth bit
-        let offset_size_minus_one = header_byte >> 6; // Last two bits
-        Ok(Self {
-            version,
-            is_sorted,
-            offset_size: OffsetSizeBytes::try_new(offset_size_minus_one)?,
-        })
-    }
-}
-
-/// [`Variant`] Metadata
-///
-/// See the [Variant Spec] file for more information
-///
-/// [Variant Spec]: 
https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#metadata-encoding
-#[derive(Clone, Copy, Debug, PartialEq)]
-pub struct VariantMetadata<'m> {
-    bytes: &'m [u8],
-    header: VariantMetadataHeader,
-    dict_size: usize,
-    dictionary_key_start_byte: usize,
-}
-
-impl<'m> VariantMetadata<'m> {
-    /// View the raw bytes (needed by very low-level decoders)
-    #[inline]
-    pub const fn as_bytes(&self) -> &'m [u8] {
-        self.bytes
-    }
-
-    /// Attempts to interpret `bytes` as a variant metadata instance.
-    ///
-    /// # Validation
-    ///
-    /// This constructor verifies that `bytes` points to a valid variant 
metadata instance. In
-    /// particular, all offsets are in-bounds and point to valid utf8 strings.
-    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
-        let header_byte = first_byte_from_slice(bytes)?;
-        let header = VariantMetadataHeader::try_new(header_byte)?;
-
-        // Offset 1, index 0 because first element after header is dictionary 
size
-        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
-
-        // Calculate the starting offset of the dictionary string bytes.
-        //
-        // Value header, dict_size (offset_size bytes), and dict_size+1 offsets
-        // = 1 + offset_size + (dict_size + 1) * offset_size
-        // = (dict_size + 2) * offset_size + 1
-        let dictionary_key_start_byte = dict_size
-            .checked_add(2)
-            .and_then(|n| n.checked_mul(header.offset_size as usize))
-            .and_then(|n| n.checked_add(1))
-            .ok_or_else(|| ArrowError::InvalidArgumentError("metadata length 
overflow".into()))?;
-        println!("dictionary_key_start_byte: {dictionary_key_start_byte}");
-        let s = Self {
-            bytes,
-            header,
-            dict_size,
-            dictionary_key_start_byte,
-        };
-
-        // Iterate over all string keys in this dictionary in order to 
validate the offset array and
-        // prove that the string bytes are all in bounds. Otherwise, `iter` 
might panic on `unwrap`.
-        validate_fallible_iterator(s.iter_checked())?;
-        Ok(s)
-    }
-
-    /// Whether the dictionary keys are sorted and unique
-    pub fn is_sorted(&self) -> bool {
-        self.header.is_sorted
-    }
-
-    /// Get the dictionary size
-    pub fn dictionary_size(&self) -> usize {
-        self.dict_size
-    }
-
-    /// The variant protocol version
-    pub fn version(&self) -> u8 {
-        self.header.version
-    }
-
-    /// Gets an offset array entry by index.
-    ///
-    /// This offset is an index into the dictionary, at the boundary between 
string `i-1` and string
-    /// `i`. See [`Self::get`] to retrieve a specific dictionary entry.
-    fn get_offset(&self, i: usize) -> Result<usize, ArrowError> {
-        // Skipping the header byte (setting byte_offset = 1) and the 
dictionary_size (setting offset_index +1)
-        let bytes = slice_from_slice(self.bytes, 
..self.dictionary_key_start_byte)?;
-        self.header.offset_size.unpack_usize(bytes, 1, i + 1)
-    }
-
-    /// Gets a dictionary entry by index
-    pub fn get(&self, i: usize) -> Result<&'m str, ArrowError> {
-        let dictionary_keys_bytes = slice_from_slice(self.bytes, 
self.dictionary_key_start_byte..)?;
-        let byte_range = self.get_offset(i)?..self.get_offset(i + 1)?;
-        string_from_slice(dictionary_keys_bytes, byte_range)
-    }
-
-    /// Get all dictionary entries as an Iterator of strings
-    pub fn iter(&self) -> impl Iterator<Item = &'m str> + '_ {
-        // NOTE: It is safe to unwrap because the constructor already made a 
successful traversal.
-        self.iter_checked().map(Result::unwrap)
-    }
-
-    // Fallible iteration over the fields of this dictionary. The constructor 
traverses the iterator
-    // to prove it has no errors, so that all other use sites can blindly 
`unwrap` the result.
-    fn iter_checked(&self) -> impl Iterator<Item = Result<&'m str, 
ArrowError>> + '_ {
-        (0..self.dict_size).map(move |i| self.get(i))
-    }
-}
-
-/// Header structure for [`VariantObject`]
-#[derive(Clone, Debug, PartialEq)]
-pub(crate) struct VariantObjectHeader {
-    field_offset_size: OffsetSizeBytes,
-    field_id_size: OffsetSizeBytes,
-    is_large: bool,
-}
-
-impl VariantObjectHeader {
-    pub(crate) fn try_new(header_byte: u8) -> Result<Self, ArrowError> {
-        // Parse the header byte to get object parameters
-        let value_header = header_byte >> 2;
-        let field_offset_size_minus_one = value_header & 0x03; // Last 2 bits
-        let field_id_size_minus_one = (value_header >> 2) & 0x03; // Next 2 
bits
-        let is_large = (value_header & 0x10) != 0; // 5th bit
-
-        Ok(Self {
-            field_offset_size: 
OffsetSizeBytes::try_new(field_offset_size_minus_one)?,
-            field_id_size: OffsetSizeBytes::try_new(field_id_size_minus_one)?,
-            is_large,
-        })
-    }
-}
-
-/// A [`Variant`] Object (struct with named fields).
-#[derive(Clone, Debug, PartialEq)]
-pub struct VariantObject<'m, 'v> {
-    pub metadata: VariantMetadata<'m>,
-    pub value: &'v [u8],
-    header: VariantObjectHeader,
-    num_elements: usize,
-    field_ids_start_byte: usize,
-    field_offsets_start_byte: usize,
-    values_start_byte: usize,
-}
-
-impl<'m, 'v> VariantObject<'m, 'v> {
-    /// Attempts to interpret `value` as a variant object value.
-    ///
-    /// # Validation
-    ///
-    /// This constructor verifies that `value` points to a valid variant 
object value. In
-    /// particular, that all field ids exist in `metadata`, and all offsets 
are in-bounds and point
-    /// to valid objects.
-    // TODO: How to make the validation non-recursive while still making 
iterators safely infallible??
-    // See https://github.com/apache/arrow-rs/issues/7711
-    pub fn try_new(metadata: VariantMetadata<'m>, value: &'v [u8]) -> 
Result<Self, ArrowError> {
-        let header_byte = first_byte_from_slice(value)?;
-        let header = VariantObjectHeader::try_new(header_byte)?;
-
-        // Determine num_elements size based on is_large flag
-        let num_elements_size = if header.is_large {
-            OffsetSizeBytes::Four
-        } else {
-            OffsetSizeBytes::One
-        };
-
-        // Parse num_elements
-        let num_elements = num_elements_size.unpack_usize(value, 1, 0)?;
-
-        // Calculate byte offsets for different sections
-        let field_ids_start_byte = 1 + num_elements_size as usize;
-        let field_offsets_start_byte =
-            field_ids_start_byte + num_elements * header.field_id_size as 
usize;
-        let values_start_byte =
-            field_offsets_start_byte + (num_elements + 1) * 
header.field_offset_size as usize;
-
-        // Spec says: "The last field_offset points to the byte after the end 
of the last value"
-        //
-        // Use the last offset as a bounds check. The iterator check below 
doesn't use it -- offsets
-        // are not monotonic -- so we have to check separately here.
-        let last_field_offset =
-            header
-                .field_offset_size
-                .unpack_usize(value, field_offsets_start_byte, num_elements)?;
-        if values_start_byte + last_field_offset > value.len() {
-            return Err(ArrowError::InvalidArgumentError(format!(
-                "Last field offset value {} at offset {} is outside the value 
slice of length {}",
-                last_field_offset,
-                values_start_byte,
-                value.len()
-            )));
-        }
-
-        let s = Self {
-            metadata,
-            value,
-            header,
-            num_elements,
-            field_ids_start_byte,
-            field_offsets_start_byte,
-            values_start_byte,
-        };
-
-        // Iterate over all fields of this object in order to validate the 
field_id and field_offset
-        // arrays, and also to prove the field values are all in bounds. 
Otherwise, `iter` might
-        // panic on `unwrap`.
-        validate_fallible_iterator(s.iter_checked())?;
-        Ok(s)
-    }
-
-    /// Returns the number of key-value pairs in this object
-    pub fn len(&self) -> usize {
-        self.num_elements
-    }
-
-    /// Returns true if the object contains no key-value pairs
-    pub fn is_empty(&self) -> bool {
-        self.len() == 0
-    }
-
-    /// Get a field's value by index in `0..self.len()`
-    pub fn field(&self, i: usize) -> Result<Variant<'m, 'v>, ArrowError> {
-        let start_offset = self.header.field_offset_size.unpack_usize(
-            self.value,
-            self.field_offsets_start_byte,
-            i,
-        )?;
-        let value_bytes = slice_from_slice(self.value, self.values_start_byte 
+ start_offset..)?;
-        Variant::try_new_with_metadata(self.metadata, value_bytes)
-    }
-
-    /// Get a field's name by index in `0..self.len()`
-    pub fn field_name(&self, i: usize) -> Result<&'m str, ArrowError> {
-        let field_id =
-            self.header
-                .field_id_size
-                .unpack_usize(self.value, self.field_ids_start_byte, i)?;
-        self.metadata.get(field_id)
-    }
-
-    /// Returns an iterator of (name, value) pairs over the fields of this 
object.
-    pub fn iter(&self) -> impl Iterator<Item = (&'m str, Variant<'m, 'v>)> + 
'_ {
-        // NOTE: It is safe to unwrap because the constructor already made a 
successful traversal.
-        self.iter_checked().map(Result::unwrap)
-    }
-
-    // Fallible iteration over the fields of this object. The constructor 
traverses the iterator to
-    // prove it has no errors, so that all other use sites can blindly 
`unwrap` the result.
-    fn iter_checked(
-        &self,
-    ) -> impl Iterator<Item = Result<(&'m str, Variant<'m, 'v>), ArrowError>> 
+ '_ {
-        (0..self.num_elements).map(move |i| Ok((self.field_name(i)?, 
self.field(i)?)))
-    }
-
-    /// Returns the value of the field with the specified name, if any.
-    ///
-    /// `Ok(None)` means the field does not exist; `Err` means the search 
encountered an error.
-    pub fn field_by_name(&self, name: &str) -> Result<Option<Variant<'m, 'v>>, 
ArrowError> {
-        // Binary search through the field IDs of this object to find the 
requested field name.
-        //
-        // NOTE: This does not require a sorted metadata dictionary, because 
the variant spec
-        // requires object field ids to be lexically sorted by their 
corresponding string values,
-        // and probing the dictionary for a field id is always O(1) work.
-        let search_result =
-            try_binary_search_range_by(0..self.num_elements, &name, |i| 
self.field_name(i))?;
-
-        search_result.ok().map(|i| self.field(i)).transpose()
-    }
-}
-
-/// A parsed version of the variant array value header byte.
-#[derive(Clone, Debug, PartialEq)]
-pub(crate) struct VariantListHeader {
-    offset_size: OffsetSizeBytes,
-    is_large: bool,
-}
-
-impl VariantListHeader {
-    pub(crate) fn try_new(header_byte: u8) -> Result<Self, ArrowError> {
-        // The 6 first bits to the left are the value_header and the 2 bits
-        // to the right are the basic type, so we shift to get only the 
value_header
-        let value_header = header_byte >> 2;
-        let is_large = (value_header & 0x04) != 0; // 3rd bit from the right
-        let field_offset_size_minus_one = value_header & 0x03; // Last two bits
-        let offset_size = 
OffsetSizeBytes::try_new(field_offset_size_minus_one)?;
-
-        Ok(Self {
-            offset_size,
-            is_large,
-        })
-    }
-}
-
-/// [`Variant`] Array.
-///
-/// NOTE: The "list" naming differs from the variant spec -- which calls it 
"array" -- in order to be
-/// consistent with Parquet and Arrow type naming. Otherwise, the name would 
conflict with the
-/// `VariantArray : Array` we must eventually define for variant-typed arrow 
arrays.
-#[derive(Clone, Debug, PartialEq)]
-pub struct VariantList<'m, 'v> {
-    pub metadata: VariantMetadata<'m>,
-    pub value: &'v [u8],
-    header: VariantListHeader,
-    num_elements: usize,
-    first_offset_byte: usize,
-    first_value_byte: usize,
-}
-
-impl<'m, 'v> VariantList<'m, 'v> {
-    /// Attempts to interpret `value` as a variant array value.
-    ///
-    /// # Validation
-    ///
-    /// This constructor verifies that `value` points to a valid variant array 
value. In particular,
-    /// that all offsets are in-bounds and point to valid objects.
-    // TODO: How to make the validation non-recursive while still making 
iterators safely infallible??
-    // See https://github.com/apache/arrow-rs/issues/7711
-    pub fn try_new(metadata: VariantMetadata<'m>, value: &'v [u8]) -> 
Result<Self, ArrowError> {
-        let header_byte = first_byte_from_slice(value)?;
-        let header = VariantListHeader::try_new(header_byte)?;
-
-        // The size of the num_elements entry in the array value_data is 4 
bytes if
-        // is_large is true, otherwise 1 byte.
-        let num_elements_size = match header.is_large {
-            true => OffsetSizeBytes::Four,
-            false => OffsetSizeBytes::One,
-        };
-
-        // Skip the header byte to read the num_elements
-        let num_elements = num_elements_size.unpack_usize(value, 1, 0)?;
-        let first_offset_byte = 1 + num_elements_size as usize;
 
-        let overflow =
-            || ArrowError::InvalidArgumentError("Variant value_byte_length 
overflow".into());
-
-        // 1.  num_elements + 1
-        let n_offsets = num_elements.checked_add(1).ok_or_else(overflow)?;
-
-        // 2.  (num_elements + 1) * offset_size
-        let value_bytes = n_offsets
-            .checked_mul(header.offset_size as usize)
-            .ok_or_else(overflow)?;
-
-        // 3.  first_offset_byte + ...
-        let first_value_byte = first_offset_byte
-            .checked_add(value_bytes)
-            .ok_or_else(overflow)?;
-
-        let s = Self {
-            metadata,
-            value,
-            header,
-            num_elements,
-            first_offset_byte,
-            first_value_byte,
-        };
-
-        // Iterate over all values of this array in order to validate the 
field_offset array and
-        // prove that the field values are all in bounds. Otherwise, `iter` 
might panic on `unwrap`.
-        validate_fallible_iterator(s.iter_checked())?;
-        Ok(s)
-    }
-
-    /// Return the length of this array
-    pub fn len(&self) -> usize {
-        self.num_elements
-    }
-
-    /// Is the array of zero length
-    pub fn is_empty(&self) -> bool {
-        self.len() == 0
-    }
-
-    pub fn get(&self, index: usize) -> Result<Variant<'m, 'v>, ArrowError> {
-        if index >= self.num_elements {
-            return Err(ArrowError::InvalidArgumentError(format!(
-                "Index {} out of bounds for list of length {}",
-                index, self.num_elements,
-            )));
-        }
-
-        // Skip header and num_elements bytes to read the offsets
-        let unpack = |i| {
-            self.header
-                .offset_size
-                .unpack_usize(self.value, self.first_offset_byte, i)
-        };
-
-        // Read the value bytes from the offsets
-        let variant_value_bytes = slice_from_slice(
-            self.value,
-            self.first_value_byte + unpack(index)?..self.first_value_byte + 
unpack(index + 1)?,
-        )?;
-        let variant = Variant::try_new_with_metadata(self.metadata, 
variant_value_bytes)?;
-        Ok(variant)
-    }
-
-    /// Iterates over the values of this list
-    pub fn iter(&self) -> impl Iterator<Item = Variant<'m, 'v>> + '_ {
-        // NOTE: It is safe to unwrap because the constructor already made a 
successful traversal.
-        self.iter_checked().map(Result::unwrap)
-    }
-
-    // Fallible iteration over the fields of this dictionary. The constructor 
traverses the iterator
-    // to prove it has no errors, so that all other use sites can blindly 
`unwrap` the result.
-    fn iter_checked(&self) -> impl Iterator<Item = Result<Variant<'m, 'v>, 
ArrowError>> + '_ {
-        (0..self.len()).map(move |i| self.get(i))
-    }
-}
+mod list;
+mod metadata;
+mod object;
 
 /// Represents a [Parquet Variant]
 ///
@@ -714,7 +199,7 @@ impl<'m, 'v> Variant<'m, 'v> {
     ) -> Result<Self, ArrowError> {
         let value_metadata = first_byte_from_slice(value)?;
         let value_data = slice_from_slice(value, 1..)?;
-        let new_self = match get_basic_type(value_metadata)? {
+        let new_self = match get_basic_type(value_metadata) {
             VariantBasicType::Primitive => match 
get_primitive_type(value_metadata)? {
                 VariantPrimitiveType::Null => Variant::Null,
                 VariantPrimitiveType::Int8 => 
Variant::Int8(decoder::decode_int8(value_data)?),
@@ -1383,467 +868,3 @@ impl<'v> From<&'v str> for Variant<'_, 'v> {
         }
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_offset() {
-        assert_eq!(OffsetSizeBytes::try_new(0).unwrap(), OffsetSizeBytes::One);
-        assert_eq!(OffsetSizeBytes::try_new(1).unwrap(), OffsetSizeBytes::Two);
-        assert_eq!(OffsetSizeBytes::try_new(2).unwrap(), 
OffsetSizeBytes::Three);
-        assert_eq!(OffsetSizeBytes::try_new(3).unwrap(), 
OffsetSizeBytes::Four);
-
-        // everything outside 0-3 must error
-        assert!(OffsetSizeBytes::try_new(4).is_err());
-        assert!(OffsetSizeBytes::try_new(255).is_err());
-    }
-
-    #[test]
-    fn unpack_usize_all_widths() {
-        // One-byte offsets
-        let buf_one = [0x01u8, 0xAB, 0xCD];
-        assert_eq!(
-            OffsetSizeBytes::One.unpack_usize(&buf_one, 0, 0).unwrap(),
-            0x01
-        );
-        assert_eq!(
-            OffsetSizeBytes::One.unpack_usize(&buf_one, 0, 2).unwrap(),
-            0xCD
-        );
-
-        // Two-byte offsets (little-endian 0x1234, 0x5678)
-        let buf_two = [0x34, 0x12, 0x78, 0x56];
-        assert_eq!(
-            OffsetSizeBytes::Two.unpack_usize(&buf_two, 0, 0).unwrap(),
-            0x1234
-        );
-        assert_eq!(
-            OffsetSizeBytes::Two.unpack_usize(&buf_two, 0, 1).unwrap(),
-            0x5678
-        );
-
-        // Three-byte offsets (0x030201 and 0x0000FF)
-        let buf_three = [0x01, 0x02, 0x03, 0xFF, 0x00, 0x00];
-        assert_eq!(
-            OffsetSizeBytes::Three
-                .unpack_usize(&buf_three, 0, 0)
-                .unwrap(),
-            0x030201
-        );
-        assert_eq!(
-            OffsetSizeBytes::Three
-                .unpack_usize(&buf_three, 0, 1)
-                .unwrap(),
-            0x0000FF
-        );
-
-        // Four-byte offsets (0x12345678, 0x90ABCDEF)
-        let buf_four = [0x78, 0x56, 0x34, 0x12, 0xEF, 0xCD, 0xAB, 0x90];
-        assert_eq!(
-            OffsetSizeBytes::Four.unpack_usize(&buf_four, 0, 0).unwrap(),
-            0x1234_5678
-        );
-        assert_eq!(
-            OffsetSizeBytes::Four.unpack_usize(&buf_four, 0, 1).unwrap(),
-            0x90AB_CDEF
-        );
-    }
-
-    #[test]
-    fn unpack_usize_out_of_bounds() {
-        let tiny = [0x00u8]; // deliberately too short
-        assert!(OffsetSizeBytes::Two.unpack_usize(&tiny, 0, 0).is_err());
-        assert!(OffsetSizeBytes::Three.unpack_usize(&tiny, 0, 0).is_err());
-    }
-
-    #[test]
-    fn unpack_simple() {
-        let buf = [
-            0x41, // header
-            0x02, 0x00, // dictionary_size = 2
-            0x00, 0x00, // offset[0] = 0
-            0x05, 0x00, // offset[1] = 5
-            0x09, 0x00, // offset[2] = 9
-        ];
-
-        let width = OffsetSizeBytes::Two;
-
-        // dictionary_size starts immediately after the header
-        let dict_size = width.unpack_usize(&buf, 1, 0).unwrap();
-        assert_eq!(dict_size, 2);
-
-        let first = width.unpack_usize(&buf, 1, 1).unwrap();
-        assert_eq!(first, 0);
-
-        let second = width.unpack_usize(&buf, 1, 2).unwrap();
-        assert_eq!(second, 5);
-
-        let third = width.unpack_usize(&buf, 1, 3).unwrap();
-        assert_eq!(third, 9);
-
-        let err = width.unpack_usize(&buf, 1, 4);
-        assert!(err.is_err())
-    }
-
-    /// `"cat"`, `"dog"` – valid metadata
-    #[test]
-    fn try_new_ok_inline() {
-        let bytes = &[
-            0b0000_0001, // header, offset_size_minus_one=0 and version=1
-            0x02,        // dictionary_size (2 strings)
-            0x00,
-            0x03,
-            0x06,
-            b'c',
-            b'a',
-            b't',
-            b'd',
-            b'o',
-            b'g',
-        ];
-
-        let md = VariantMetadata::try_new(bytes).expect("should parse");
-        assert_eq!(md.dictionary_size(), 2);
-        // Fields
-        assert_eq!(md.get(0).unwrap(), "cat");
-        assert_eq!(md.get(1).unwrap(), "dog");
-
-        // Offsets
-        assert_eq!(md.get_offset(0).unwrap(), 0x00);
-        assert_eq!(md.get_offset(1).unwrap(), 0x03);
-        assert_eq!(md.get_offset(2).unwrap(), 0x06);
-
-        let err = md.get_offset(3).unwrap_err();
-        assert!(
-            matches!(err, ArrowError::InvalidArgumentError(_)),
-            "unexpected error: {err:?}"
-        );
-
-        let fields: Vec<(usize, &str)> = md.iter().enumerate().collect();
-        assert_eq!(fields, vec![(0usize, "cat"), (1usize, "dog")]);
-    }
-
-    /// Too short buffer test (missing one required offset).
-    /// Should error with "metadata shorter than dictionary_size implies".
-    #[test]
-    fn try_new_missing_last_value() {
-        let bytes = &[
-            0b0000_0001, // header, offset_size_minus_one=0 and version=1
-            0x02,        // dictionary_size = 2
-            0x00,
-            0x01,
-            0x02,
-            b'a',
-            b'b', // <-- we'll remove this
-        ];
-
-        let working_md = VariantMetadata::try_new(bytes).expect("should 
parse");
-        assert_eq!(working_md.dictionary_size(), 2);
-        assert_eq!(working_md.get(0).unwrap(), "a");
-        assert_eq!(working_md.get(1).unwrap(), "b");
-
-        let truncated = &bytes[..bytes.len() - 1];
-
-        let err = VariantMetadata::try_new(truncated).unwrap_err();
-        assert!(
-            matches!(err, ArrowError::InvalidArgumentError(_)),
-            "unexpected error: {err:?}"
-        );
-    }
-
-    #[test]
-    fn try_new_fails_non_monotonic() {
-        // 'cat', 'dog', 'lamb'
-        let bytes = &[
-            0b0000_0001, // header, offset_size_minus_one=0 and version=1
-            0x03,        // dictionary_size
-            0x00,
-            0x02,
-            0x01, // Doesn't increase monotonically
-            0x10,
-            b'c',
-            b'a',
-            b't',
-            b'd',
-            b'o',
-            b'g',
-            b'l',
-            b'a',
-            b'm',
-            b'b',
-        ];
-
-        let err = VariantMetadata::try_new(bytes).unwrap_err();
-        assert!(
-            matches!(err, ArrowError::InvalidArgumentError(_)),
-            "unexpected error: {err:?}"
-        );
-    }
-
-    #[test]
-    fn try_new_truncated_offsets_inline() {
-        // Missing final offset
-        let bytes = &[0b0000_0001, 0x02, 0x00, 0x01];
-
-        let err = VariantMetadata::try_new(bytes).unwrap_err();
-        assert!(
-            matches!(err, ArrowError::InvalidArgumentError(_)),
-            "unexpected error: {err:?}"
-        );
-    }
-
-    #[test]
-    fn test_variant_object_simple() {
-        // Create metadata with field names: "age", "name", "active" (sorted)
-        // Header: version=1, sorted=1, offset_size=1 (offset_size_minus_one=0)
-        // So header byte = 00_0_1_0001 = 0x10
-        let metadata_bytes = vec![
-            0b0001_0001,
-            3, // dictionary size
-            0, // "active"
-            6, // "age"
-            9, // "name"
-            13,
-            b'a',
-            b'c',
-            b't',
-            b'i',
-            b'v',
-            b'e',
-            b'a',
-            b'g',
-            b'e',
-            b'n',
-            b'a',
-            b'm',
-            b'e',
-        ];
-        let metadata = VariantMetadata::try_new(&metadata_bytes).unwrap();
-
-        // Create object value data for: {"active": true, "age": 42, "name": 
"hello"}
-        // Field IDs in sorted order: [0, 1, 2] (active, age, name)
-        // Header: basic_type=2, field_offset_size_minus_one=0, 
field_id_size_minus_one=0, is_large=0
-        // value_header = 0000_00_00 = 0x00
-        // So header byte = (0x00 << 2) | 2 = 0x02
-        let object_value = vec![
-            0x02, // header: basic_type=2, value_header=0x00
-            3,    // num_elements = 3
-            // Field IDs (1 byte each): active=0, age=1, name=2
-            0, 1, 2,
-            // Field offsets (1 byte each): 4 offsets total
-            0, // offset to first value (boolean true)
-            1, // offset to second value (int8)
-            3, // offset to third value (short string)
-            9, // end offset
-            // Values:
-            0x04, // boolean true: primitive_header=1, basic_type=0 -> (1 << 
2) | 0 = 0x04
-            0x0C,
-            42, // int8: primitive_header=3, basic_type=0 -> (3 << 2) | 0 = 
0x0C, then value 42
-            0x15, b'h', b'e', b'l', b'l',
-            b'o', // short string: length=5, basic_type=1 -> (5 << 2) | 1 = 
0x15
-        ];
-
-        let variant_obj = VariantObject::try_new(metadata, 
&object_value).unwrap();
-
-        // Test basic properties
-        assert_eq!(variant_obj.len(), 3);
-        assert!(!variant_obj.is_empty());
-
-        // Test field access
-        let active_field = variant_obj.field_by_name("active").unwrap();
-        assert!(active_field.is_some());
-        assert_eq!(active_field.unwrap().as_boolean(), Some(true));
-
-        let age_field = variant_obj.field_by_name("age").unwrap();
-        assert!(age_field.is_some());
-        assert_eq!(age_field.unwrap().as_int8(), Some(42));
-
-        let name_field = variant_obj.field_by_name("name").unwrap();
-        assert!(name_field.is_some());
-        assert_eq!(name_field.unwrap().as_string(), Some("hello"));
-
-        // Test non-existent field
-        let missing_field = variant_obj.field_by_name("missing").unwrap();
-        assert!(missing_field.is_none());
-
-        // Test fields iterator
-        let fields: Vec<_> = variant_obj.iter().collect();
-        assert_eq!(fields.len(), 3);
-
-        // Fields should be in sorted order: active, age, name
-        assert_eq!(fields[0].0, "active");
-        assert_eq!(fields[0].1.as_boolean(), Some(true));
-
-        assert_eq!(fields[1].0, "age");
-        assert_eq!(fields[1].1.as_int8(), Some(42));
-
-        assert_eq!(fields[2].0, "name");
-        assert_eq!(fields[2].1.as_string(), Some("hello"));
-    }
-
-    #[test]
-    fn test_variant_object_empty() {
-        // Create metadata with no fields
-        let metadata_bytes = vec![
-            0x11, // header: version=1, sorted=0, offset_size_minus_one=0
-            0,    // dictionary_size = 0
-            0,    // offset[0] = 0 (end of dictionary)
-        ];
-        let metadata = VariantMetadata::try_new(&metadata_bytes).unwrap();
-
-        // Create empty object value data: {}
-        let object_value = vec![
-            0x02, // header: basic_type=2, value_header=0x00
-            0,    // num_elements = 0
-            0,    // single offset pointing to end
-                  // No field IDs, no values
-        ];
-
-        let variant_obj = VariantObject::try_new(metadata, 
&object_value).unwrap();
-
-        // Test basic properties
-        assert_eq!(variant_obj.len(), 0);
-        assert!(variant_obj.is_empty());
-
-        // Test field access on empty object
-        let missing_field = variant_obj.field_by_name("anything").unwrap();
-        assert!(missing_field.is_none());
-
-        // Test fields iterator on empty object
-        let fields: Vec<_> = variant_obj.iter().collect();
-        assert_eq!(fields.len(), 0);
-    }
-
-    #[test]
-    fn test_variant_list_simple() {
-        // Create simple metadata (empty dictionary for this test)
-        let metadata_bytes = vec![
-            0x01, // header: version=1, sorted=0, offset_size_minus_one=0
-            0,    // dictionary_size = 0
-            0,    // offset[0] = 0 (end of dictionary)
-        ];
-        let metadata = VariantMetadata::try_new(&metadata_bytes).unwrap();
-
-        // Create list value data for: [42, true, "hi"]
-        // Header: basic_type=3 (array), field_offset_size_minus_one=0, 
is_large=0
-        // value_header = 0000_0_0_00 = 0x00
-        // So header byte = (0x00 << 2) | 3 = 0x03
-        let list_value = vec![
-            0x03, // header: basic_type=3, value_header=0x00
-            3,    // num_elements = 3
-            // Offsets (1 byte each): 4 offsets total
-            0, // offset to first value (int8)
-            2, // offset to second value (boolean true)
-            3, // offset to third value (short string)
-            6, // end offset
-            // Values:
-            0x0C,
-            42,   // int8: primitive_header=3, basic_type=0 -> (3 << 2) | 0 = 
0x0C, then value 42
-            0x04, // boolean true: primitive_header=1, basic_type=0 -> (1 << 
2) | 0 = 0x04
-            0x09, b'h', b'i', // short string: length=2, basic_type=1 -> (2 << 
2) | 1 = 0x09
-        ];
-
-        let variant_list = VariantList::try_new(metadata, 
&list_value).unwrap();
-
-        // Test basic properties
-        assert_eq!(variant_list.len(), 3);
-        assert!(!variant_list.is_empty());
-
-        // Test individual element access
-        let elem0 = variant_list.get(0).unwrap();
-        assert_eq!(elem0.as_int8(), Some(42));
-
-        let elem1 = variant_list.get(1).unwrap();
-        assert_eq!(elem1.as_boolean(), Some(true));
-
-        let elem2 = variant_list.get(2).unwrap();
-        assert_eq!(elem2.as_string(), Some("hi"));
-
-        // Test out of bounds access
-        let out_of_bounds = variant_list.get(3);
-        assert!(out_of_bounds.is_err());
-        assert!(matches!(
-            out_of_bounds.unwrap_err(),
-            ArrowError::InvalidArgumentError(ref msg) if msg.contains("out of 
bounds")
-        ));
-
-        // Test values iterator
-        let values: Vec<_> = variant_list.iter().collect();
-        assert_eq!(values.len(), 3);
-        assert_eq!(values[0].as_int8(), Some(42));
-        assert_eq!(values[1].as_boolean(), Some(true));
-        assert_eq!(values[2].as_string(), Some("hi"));
-    }
-
-    #[test]
-    fn test_variant_list_empty() {
-        // Create simple metadata (empty dictionary)
-        let metadata_bytes = vec![
-            0x01, // header: version=1, sorted=0, offset_size_minus_one=0
-            0,    // dictionary_size = 0
-            0,    // offset[0] = 0 (end of dictionary)
-        ];
-        let metadata = VariantMetadata::try_new(&metadata_bytes).unwrap();
-
-        // Create empty list value data: []
-        let list_value = vec![
-            0x03, // header: basic_type=3, value_header=0x00
-            0,    // num_elements = 0
-            0,    // single offset pointing to end
-                  // No values
-        ];
-
-        let variant_list = VariantList::try_new(metadata, 
&list_value).unwrap();
-
-        // Test basic properties
-        assert_eq!(variant_list.len(), 0);
-        assert!(variant_list.is_empty());
-
-        // Test out of bounds access on empty list
-        let out_of_bounds = variant_list.get(0);
-        assert!(out_of_bounds.is_err());
-
-        // Test values iterator on empty list
-        let values: Vec<_> = variant_list.iter().collect();
-        assert_eq!(values.len(), 0);
-    }
-
-    #[test]
-    fn test_variant_list_large() {
-        // Create simple metadata (empty dictionary)
-        let metadata_bytes = vec![
-            0x01, // header: version=1, sorted=0, offset_size_minus_one=0
-            0,    // dictionary_size = 0
-            0,    // offset[0] = 0 (end of dictionary)
-        ];
-        let metadata = VariantMetadata::try_new(&metadata_bytes).unwrap();
-
-        // Create large list value data with 2-byte offsets: [null, false]
-        // Header: is_large=1, field_offset_size_minus_one=1, basic_type=3 
(array)
-        let list_bytes = vec![
-            0x17, // header = 000_1_01_11 = 0x17
-            2, 0, 0, 0, // num_elements = 2 (4 bytes because is_large=1)
-            // Offsets (2 bytes each): 3 offsets total
-            0x00, 0x00, 0x01, 0x00, // first value (null)
-            0x02, 0x00, // second value (boolean false)
-            // Values:
-            0x00, // null: primitive_header=0, basic_type=0 -> (0 << 2) | 0 = 
0x00
-            0x08, // boolean false: primitive_header=2, basic_type=0 -> (2 << 
2) | 0 = 0x08
-        ];
-
-        let variant_list = VariantList::try_new(metadata, 
&list_bytes).unwrap();
-
-        // Test basic properties
-        assert_eq!(variant_list.len(), 2);
-        assert!(!variant_list.is_empty());
-
-        // Test individual element access
-        let elem0 = variant_list.get(0).unwrap();
-        assert_eq!(elem0.as_null(), Some(()));
-
-        let elem1 = variant_list.get(1).unwrap();
-        assert_eq!(elem1.as_boolean(), Some(false));
-    }
-}
diff --git a/parquet-variant/src/variant/list.rs 
b/parquet-variant/src/variant/list.rs
new file mode 100644
index 0000000000..d9fd20eacc
--- /dev/null
+++ b/parquet-variant/src/variant/list.rs
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use crate::decoder::OffsetSizeBytes;
+use crate::utils::{first_byte_from_slice, slice_from_slice, 
validate_fallible_iterator};
+use crate::variant::{Variant, VariantMetadata};
+
+use arrow_schema::ArrowError;
+
+/// A parsed version of the variant array value header byte.
+#[derive(Clone, Debug, PartialEq)]
+pub(crate) struct VariantListHeader {
+    offset_size: OffsetSizeBytes,
+    is_large: bool,
+}
+
+impl VariantListHeader {
+    pub(crate) fn try_new(header_byte: u8) -> Result<Self, ArrowError> {
+        // The 6 first bits to the left are the value_header and the 2 bits
+        // to the right are the basic type, so we shift to get only the 
value_header
+        let value_header = header_byte >> 2;
+        let is_large = (value_header & 0x04) != 0; // 3rd bit from the right
+        let field_offset_size_minus_one = value_header & 0x03; // Last two bits
+        let offset_size = 
OffsetSizeBytes::try_new(field_offset_size_minus_one)?;
+
+        Ok(Self {
+            offset_size,
+            is_large,
+        })
+    }
+}
+
+/// [`Variant`] Array.
+///
+/// NOTE: The "list" naming differs from the variant spec -- which calls it 
"array" -- in order to be
+/// consistent with Parquet and Arrow type naming. Otherwise, the name would 
conflict with the
+/// `VariantArray : Array` we must eventually define for variant-typed arrow 
arrays.
+#[derive(Clone, Debug, PartialEq)]
+pub struct VariantList<'m, 'v> {
+    pub metadata: VariantMetadata<'m>,
+    pub value: &'v [u8],
+    header: VariantListHeader,
+    num_elements: usize,
+    first_offset_byte: usize,
+    first_value_byte: usize,
+}
+
+impl<'m, 'v> VariantList<'m, 'v> {
+    /// Attempts to interpret `value` as a variant array value.
+    ///
+    /// # Validation
+    ///
+    /// This constructor verifies that `value` points to a valid variant array 
value. In particular,
+    /// that all offsets are in-bounds and point to valid objects.
+    // TODO: How to make the validation non-recursive while still making 
iterators safely infallible??
+    // See https://github.com/apache/arrow-rs/issues/7711
+    pub fn try_new(metadata: VariantMetadata<'m>, value: &'v [u8]) -> 
Result<Self, ArrowError> {
+        let header_byte = first_byte_from_slice(value)?;
+        let header = VariantListHeader::try_new(header_byte)?;
+
+        // The size of the num_elements entry in the array value_data is 4 
bytes if
+        // is_large is true, otherwise 1 byte.
+        let num_elements_size = match header.is_large {
+            true => OffsetSizeBytes::Four,
+            false => OffsetSizeBytes::One,
+        };
+
+        // Skip the header byte to read the num_elements
+        let num_elements = num_elements_size.unpack_usize(value, 1, 0)?;
+        let first_offset_byte = 1 + num_elements_size as usize;
+
+        let overflow =
+            || ArrowError::InvalidArgumentError("Variant value_byte_length 
overflow".into());
+
+        // 1.  num_elements + 1
+        let n_offsets = num_elements.checked_add(1).ok_or_else(overflow)?;
+
+        // 2.  (num_elements + 1) * offset_size
+        let value_bytes = n_offsets
+            .checked_mul(header.offset_size as usize)
+            .ok_or_else(overflow)?;
+
+        // 3.  first_offset_byte + ...
+        let first_value_byte = first_offset_byte
+            .checked_add(value_bytes)
+            .ok_or_else(overflow)?;
+
+        let new_self = Self {
+            metadata,
+            value,
+            header,
+            num_elements,
+            first_offset_byte,
+            first_value_byte,
+        };
+
+        // Iterate over all values of this array in order to validate the 
field_offset array and
+        // prove that the field values are all in bounds. Otherwise, `iter` 
might panic on `unwrap`.
+        validate_fallible_iterator(new_self.iter_checked())?;
+        Ok(new_self)
+    }
+
+    /// Return the length of this array
+    pub fn len(&self) -> usize {
+        self.num_elements
+    }
+
+    /// Is the array of zero length
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    pub fn get(&self, index: usize) -> Result<Variant<'m, 'v>, ArrowError> {
+        if index >= self.num_elements {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Index {} out of bounds for list of length {}",
+                index, self.num_elements,
+            )));
+        }
+
+        // Skip header and num_elements bytes to read the offsets
+        let unpack = |i| {
+            self.header
+                .offset_size
+                .unpack_usize(self.value, self.first_offset_byte, i)
+        };
+
+        // Read the value bytes from the offsets
+        let variant_value_bytes = slice_from_slice(
+            self.value,
+            self.first_value_byte + unpack(index)?..self.first_value_byte + 
unpack(index + 1)?,
+        )?;
+        let variant = Variant::try_new_with_metadata(self.metadata, 
variant_value_bytes)?;
+        Ok(variant)
+    }
+
+    /// Iterates over the values of this list
+    pub fn iter(&self) -> impl Iterator<Item = Variant<'m, 'v>> + '_ {
+        // NOTE: It is safe to unwrap because the constructor already made a 
successful traversal.
+        self.iter_checked().map(Result::unwrap)
+    }
+
+    // Fallible iteration over the fields of this dictionary. The constructor 
traverses the iterator
+    // to prove it has no errors, so that all other use sites can blindly 
`unwrap` the result.
+    fn iter_checked(&self) -> impl Iterator<Item = Result<Variant<'m, 'v>, 
ArrowError>> + '_ {
+        (0..self.len()).map(move |i| self.get(i))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_variant_list_simple() {
+        // Create simple metadata (empty dictionary for this test)
+        let metadata_bytes = vec![
+            0x01, // header: version=1, sorted=0, offset_size_minus_one=0
+            0,    // dictionary_size = 0
+            0,    // offset[0] = 0 (end of dictionary)
+        ];
+        let metadata = VariantMetadata::try_new(&metadata_bytes).unwrap();
+
+        // Create list value data for: [42, true, "hi"]
+        // Header: basic_type=3 (array), field_offset_size_minus_one=0, 
is_large=0
+        // value_header = 0000_0_0_00 = 0x00
+        // So header byte = (0x00 << 2) | 3 = 0x03
+        let list_value = vec![
+            0x03, // header: basic_type=3, value_header=0x00
+            3,    // num_elements = 3
+            // Offsets (1 byte each): 4 offsets total
+            0, // offset to first value (int8)
+            2, // offset to second value (boolean true)
+            3, // offset to third value (short string)
+            6, // end offset
+            // Values:
+            0x0C,
+            42,   // int8: primitive_header=3, basic_type=0 -> (3 << 2) | 0 = 
0x0C, then value 42
+            0x04, // boolean true: primitive_header=1, basic_type=0 -> (1 << 
2) | 0 = 0x04
+            0x09, b'h', b'i', // short string: length=2, basic_type=1 -> (2 << 
2) | 1 = 0x09
+        ];
+
+        let variant_list = VariantList::try_new(metadata, 
&list_value).unwrap();
+
+        // Test basic properties
+        assert_eq!(variant_list.len(), 3);
+        assert!(!variant_list.is_empty());
+
+        // Test individual element access
+        let elem0 = variant_list.get(0).unwrap();
+        assert_eq!(elem0.as_int8(), Some(42));
+
+        let elem1 = variant_list.get(1).unwrap();
+        assert_eq!(elem1.as_boolean(), Some(true));
+
+        let elem2 = variant_list.get(2).unwrap();
+        assert_eq!(elem2.as_string(), Some("hi"));
+
+        // Test out of bounds access
+        let out_of_bounds = variant_list.get(3);
+        assert!(out_of_bounds.is_err());
+        assert!(matches!(
+            out_of_bounds.unwrap_err(),
+            ArrowError::InvalidArgumentError(ref msg) if msg.contains("out of 
bounds")
+        ));
+
+        // Test values iterator
+        let values: Vec<_> = variant_list.iter().collect();
+        assert_eq!(values.len(), 3);
+        assert_eq!(values[0].as_int8(), Some(42));
+        assert_eq!(values[1].as_boolean(), Some(true));
+        assert_eq!(values[2].as_string(), Some("hi"));
+    }
+
+    #[test]
+    fn test_variant_list_empty() {
+        // Create simple metadata (empty dictionary)
+        let metadata_bytes = vec![
+            0x01, // header: version=1, sorted=0, offset_size_minus_one=0
+            0,    // dictionary_size = 0
+            0,    // offset[0] = 0 (end of dictionary)
+        ];
+        let metadata = VariantMetadata::try_new(&metadata_bytes).unwrap();
+
+        // Create empty list value data: []
+        let list_value = vec![
+            0x03, // header: basic_type=3, value_header=0x00
+            0,    // num_elements = 0
+            0,    // single offset pointing to end
+                  // No values
+        ];
+
+        let variant_list = VariantList::try_new(metadata, 
&list_value).unwrap();
+
+        // Test basic properties
+        assert_eq!(variant_list.len(), 0);
+        assert!(variant_list.is_empty());
+
+        // Test out of bounds access on empty list
+        let out_of_bounds = variant_list.get(0);
+        assert!(out_of_bounds.is_err());
+
+        // Test values iterator on empty list
+        let values: Vec<_> = variant_list.iter().collect();
+        assert_eq!(values.len(), 0);
+    }
+
+    #[test]
+    fn test_variant_list_large() {
+        // Create simple metadata (empty dictionary)
+        let metadata_bytes = vec![
+            0x01, // header: version=1, sorted=0, offset_size_minus_one=0
+            0,    // dictionary_size = 0
+            0,    // offset[0] = 0 (end of dictionary)
+        ];
+        let metadata = VariantMetadata::try_new(&metadata_bytes).unwrap();
+
+        // Create large list value data with 2-byte offsets: [null, false]
+        // Header: is_large=1, field_offset_size_minus_one=1, basic_type=3 
(array)
+        let list_bytes = vec![
+            0x17, // header = 000_1_01_11 = 0x17
+            2, 0, 0, 0, // num_elements = 2 (4 bytes because is_large=1)
+            // Offsets (2 bytes each): 3 offsets total
+            0x00, 0x00, 0x01, 0x00, // first value (null)
+            0x02, 0x00, // second value (boolean false)
+            // Values:
+            0x00, // null: primitive_header=0, basic_type=0 -> (0 << 2) | 0 = 
0x00
+            0x08, // boolean false: primitive_header=2, basic_type=0 -> (2 << 
2) | 0 = 0x08
+        ];
+
+        let variant_list = VariantList::try_new(metadata, 
&list_bytes).unwrap();
+
+        // Test basic properties
+        assert_eq!(variant_list.len(), 2);
+        assert!(!variant_list.is_empty());
+
+        // Test individual element access
+        let elem0 = variant_list.get(0).unwrap();
+        assert_eq!(elem0.as_null(), Some(()));
+
+        let elem1 = variant_list.get(1).unwrap();
+        assert_eq!(elem1.as_boolean(), Some(false));
+    }
+}
diff --git a/parquet-variant/src/variant/metadata.rs 
b/parquet-variant/src/variant/metadata.rs
new file mode 100644
index 0000000000..bfefeb506d
--- /dev/null
+++ b/parquet-variant/src/variant/metadata.rs
@@ -0,0 +1,287 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::decoder::OffsetSizeBytes;
+use crate::utils::{
+    first_byte_from_slice, slice_from_slice, string_from_slice, 
validate_fallible_iterator,
+};
+
+use arrow_schema::ArrowError;
+
+/// Header structure for [`VariantMetadata`]
+#[derive(Clone, Debug, Copy, PartialEq)]
+pub(crate) struct VariantMetadataHeader {
+    version: u8,
+    is_sorted: bool,
+    /// Note: This is `offset_size_minus_one` + 1
+    offset_size: OffsetSizeBytes,
+}
+
+// According to the spec this is currently always = 1, and so we store this 
const for validation
+// purposes and to make that visible.
+const CORRECT_VERSION_VALUE: u8 = 1;
+
+impl VariantMetadataHeader {
+    /// Tries to construct the variant metadata header, which has the form
+    ///
+    /// ```text
+    ///              7     6  5   4  3             0
+    ///             +-------+---+---+---------------+
+    /// header      |       |   |   |    version    |
+    ///             +-------+---+---+---------------+
+    ///                 ^         ^
+    ///                 |         +-- sorted_strings
+    ///                 +-- offset_size_minus_one
+    /// ```
+    ///
+    /// The version is a 4-bit value that must always contain the value 1.
+    /// - sorted_strings is a 1-bit value indicating whether dictionary 
strings are sorted and unique.
+    /// - offset_size_minus_one is a 2-bit value providing the number of bytes 
per dictionary size and offset field.
+    /// - The actual number of bytes, offset_size, is offset_size_minus_one + 1
+    pub(crate) fn try_new(header_byte: u8) -> Result<Self, ArrowError> {
+        let version = header_byte & 0x0F; // First four bits
+        if version != CORRECT_VERSION_VALUE {
+            let err_msg = format!(
+                "The version bytes in the header is not 
{CORRECT_VERSION_VALUE}, got {:b}",
+                version
+            );
+            return Err(ArrowError::InvalidArgumentError(err_msg));
+        }
+        let is_sorted = (header_byte & 0x10) != 0; // Fifth bit
+        let offset_size_minus_one = header_byte >> 6; // Last two bits
+        Ok(Self {
+            version,
+            is_sorted,
+            offset_size: OffsetSizeBytes::try_new(offset_size_minus_one)?,
+        })
+    }
+}
+
+/// [`Variant`] Metadata
+///
+/// See the [Variant Spec] file for more information
+///
+/// [`Variant`]: crate::Variant
+/// [Variant Spec]: 
https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#metadata-encoding
+#[derive(Clone, Copy, Debug, PartialEq)]
+pub struct VariantMetadata<'m> {
+    bytes: &'m [u8],
+    header: VariantMetadataHeader,
+    dict_size: usize,
+    dictionary_key_start_byte: usize,
+}
+
+impl<'m> VariantMetadata<'m> {
+    /// View the raw bytes (needed by very low-level decoders)
+    #[inline]
+    pub const fn as_bytes(&self) -> &'m [u8] {
+        self.bytes
+    }
+
+    /// Attempts to interpret `bytes` as a variant metadata instance.
+    ///
+    /// # Validation
+    ///
+    /// This constructor verifies that `bytes` points to a valid variant 
metadata instance. In
+    /// particular, all offsets are in-bounds and point to valid utf8 strings.
+    pub fn try_new(bytes: &'m [u8]) -> Result<Self, ArrowError> {
+        let header_byte = first_byte_from_slice(bytes)?;
+        let header = VariantMetadataHeader::try_new(header_byte)?;
+
+        // Offset 1, index 0 because first element after header is dictionary 
size
+        let dict_size = header.offset_size.unpack_usize(bytes, 1, 0)?;
+
+        // Calculate the starting offset of the dictionary string bytes.
+        //
+        // Value header, dict_size (offset_size bytes), and dict_size+1 offsets
+        // = 1 + offset_size + (dict_size + 1) * offset_size
+        // = (dict_size + 2) * offset_size + 1
+        let dictionary_key_start_byte = dict_size
+            .checked_add(2)
+            .and_then(|n| n.checked_mul(header.offset_size as usize))
+            .and_then(|n| n.checked_add(1))
+            .ok_or_else(|| ArrowError::InvalidArgumentError("metadata length 
overflow".into()))?;
+        println!("dictionary_key_start_byte: {dictionary_key_start_byte}");
+        let new_self = Self {
+            bytes,
+            header,
+            dict_size,
+            dictionary_key_start_byte,
+        };
+
+        // Iterate over all string keys in this dictionary in order to 
validate the offset array and
+        // prove that the string bytes are all in bounds. Otherwise, `iter` 
might panic on `unwrap`.
+        validate_fallible_iterator(new_self.iter_checked())?;
+        Ok(new_self)
+    }
+
+    /// Whether the dictionary keys are sorted and unique
+    pub fn is_sorted(&self) -> bool {
+        self.header.is_sorted
+    }
+
+    /// Get the dictionary size
+    pub fn dictionary_size(&self) -> usize {
+        self.dict_size
+    }
+
+    /// The variant protocol version
+    pub fn version(&self) -> u8 {
+        self.header.version
+    }
+
+    /// Gets an offset array entry by index.
+    ///
+    /// This offset is an index into the dictionary, at the boundary between 
string `i-1` and string
+    /// `i`. See [`Self::get`] to retrieve a specific dictionary entry.
+    fn get_offset(&self, i: usize) -> Result<usize, ArrowError> {
+        // Skipping the header byte (setting byte_offset = 1) and the 
dictionary_size (setting offset_index +1)
+        let bytes = slice_from_slice(self.bytes, 
..self.dictionary_key_start_byte)?;
+        self.header.offset_size.unpack_usize(bytes, 1, i + 1)
+    }
+
+    /// Gets a dictionary entry by index
+    pub fn get(&self, i: usize) -> Result<&'m str, ArrowError> {
+        let dictionary_keys_bytes = slice_from_slice(self.bytes, 
self.dictionary_key_start_byte..)?;
+        let byte_range = self.get_offset(i)?..self.get_offset(i + 1)?;
+        string_from_slice(dictionary_keys_bytes, byte_range)
+    }
+
+    /// Get all dictionary entries as an Iterator of strings
+    pub fn iter(&self) -> impl Iterator<Item = &'m str> + '_ {
+        // NOTE: It is safe to unwrap because the constructor already made a 
successful traversal.
+        self.iter_checked().map(Result::unwrap)
+    }
+
+    // Fallible iteration over the fields of this dictionary. The constructor 
traverses the iterator
+    // to prove it has no errors, so that all other use sites can blindly 
`unwrap` the result.
+    fn iter_checked(&self) -> impl Iterator<Item = Result<&'m str, 
ArrowError>> + '_ {
+        (0..self.dict_size).map(move |i| self.get(i))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    /// `"cat"`, `"dog"` – valid metadata
+    #[test]
+    fn try_new_ok_inline() {
+        let bytes = &[
+            0b0000_0001, // header, offset_size_minus_one=0 and version=1
+            0x02,        // dictionary_size (2 strings)
+            0x00,
+            0x03,
+            0x06,
+            b'c',
+            b'a',
+            b't',
+            b'd',
+            b'o',
+            b'g',
+        ];
+
+        let md = VariantMetadata::try_new(bytes).expect("should parse");
+        assert_eq!(md.dictionary_size(), 2);
+        // Fields
+        assert_eq!(md.get(0).unwrap(), "cat");
+        assert_eq!(md.get(1).unwrap(), "dog");
+
+        // Offsets
+        assert_eq!(md.get_offset(0).unwrap(), 0x00);
+        assert_eq!(md.get_offset(1).unwrap(), 0x03);
+        assert_eq!(md.get_offset(2).unwrap(), 0x06);
+
+        let err = md.get_offset(3).unwrap_err();
+        assert!(
+            matches!(err, ArrowError::InvalidArgumentError(_)),
+            "unexpected error: {err:?}"
+        );
+
+        let fields: Vec<(usize, &str)> = md.iter().enumerate().collect();
+        assert_eq!(fields, vec![(0usize, "cat"), (1usize, "dog")]);
+    }
+
+    /// Too short buffer test (missing one required offset).
+    /// Should error with "metadata shorter than dictionary_size implies".
+    #[test]
+    fn try_new_missing_last_value() {
+        let bytes = &[
+            0b0000_0001, // header, offset_size_minus_one=0 and version=1
+            0x02,        // dictionary_size = 2
+            0x00,
+            0x01,
+            0x02,
+            b'a',
+            b'b', // <-- we'll remove this
+        ];
+
+        let working_md = VariantMetadata::try_new(bytes).expect("should 
parse");
+        assert_eq!(working_md.dictionary_size(), 2);
+        assert_eq!(working_md.get(0).unwrap(), "a");
+        assert_eq!(working_md.get(1).unwrap(), "b");
+
+        let truncated = &bytes[..bytes.len() - 1];
+
+        let err = VariantMetadata::try_new(truncated).unwrap_err();
+        assert!(
+            matches!(err, ArrowError::InvalidArgumentError(_)),
+            "unexpected error: {err:?}"
+        );
+    }
+
+    #[test]
+    fn try_new_fails_non_monotonic() {
+        // 'cat', 'dog', 'lamb'
+        let bytes = &[
+            0b0000_0001, // header, offset_size_minus_one=0 and version=1
+            0x03,        // dictionary_size
+            0x00,
+            0x02,
+            0x01, // Doesn't increase monotonically
+            0x10,
+            b'c',
+            b'a',
+            b't',
+            b'd',
+            b'o',
+            b'g',
+            b'l',
+            b'a',
+            b'm',
+            b'b',
+        ];
+
+        let err = VariantMetadata::try_new(bytes).unwrap_err();
+        assert!(
+            matches!(err, ArrowError::InvalidArgumentError(_)),
+            "unexpected error: {err:?}"
+        );
+    }
+
+    #[test]
+    fn try_new_truncated_offsets_inline() {
+        // Missing final offset
+        let bytes = &[0b0000_0001, 0x02, 0x00, 0x01];
+
+        let err = VariantMetadata::try_new(bytes).unwrap_err();
+        assert!(
+            matches!(err, ArrowError::InvalidArgumentError(_)),
+            "unexpected error: {err:?}"
+        );
+    }
+}
diff --git a/parquet-variant/src/variant/object.rs 
b/parquet-variant/src/variant/object.rs
new file mode 100644
index 0000000000..471b94ccdb
--- /dev/null
+++ b/parquet-variant/src/variant/object.rs
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use crate::decoder::OffsetSizeBytes;
+use crate::utils::{
+    first_byte_from_slice, slice_from_slice, try_binary_search_range_by, 
validate_fallible_iterator,
+};
+use crate::variant::{Variant, VariantMetadata};
+
+use arrow_schema::ArrowError;
+
+/// Header structure for [`VariantObject`]
+#[derive(Clone, Debug, PartialEq)]
+pub(crate) struct VariantObjectHeader {
+    field_offset_size: OffsetSizeBytes,
+    field_id_size: OffsetSizeBytes,
+    is_large: bool,
+}
+
+impl VariantObjectHeader {
+    pub(crate) fn try_new(header_byte: u8) -> Result<Self, ArrowError> {
+        // Parse the header byte to get object parameters
+        let value_header = header_byte >> 2;
+        let field_offset_size_minus_one = value_header & 0x03; // Last 2 bits
+        let field_id_size_minus_one = (value_header >> 2) & 0x03; // Next 2 
bits
+        let is_large = (value_header & 0x10) != 0; // 5th bit
+
+        Ok(Self {
+            field_offset_size: 
OffsetSizeBytes::try_new(field_offset_size_minus_one)?,
+            field_id_size: OffsetSizeBytes::try_new(field_id_size_minus_one)?,
+            is_large,
+        })
+    }
+}
+
+/// A [`Variant`] Object (struct with named fields).
+#[derive(Clone, Debug, PartialEq)]
+pub struct VariantObject<'m, 'v> {
+    pub metadata: VariantMetadata<'m>,
+    pub value: &'v [u8],
+    header: VariantObjectHeader,
+    num_elements: usize,
+    field_ids_start_byte: usize,
+    field_offsets_start_byte: usize,
+    values_start_byte: usize,
+}
+
+impl<'m, 'v> VariantObject<'m, 'v> {
+    /// Attempts to interpret `value` as a variant object value.
+    ///
+    /// # Validation
+    ///
+    /// This constructor verifies that `value` points to a valid variant 
object value. In
+    /// particular, that all field ids exist in `metadata`, and all offsets 
are in-bounds and point
+    /// to valid objects.
+    // TODO: How to make the validation non-recursive while still making 
iterators safely infallible??
+    // See https://github.com/apache/arrow-rs/issues/7711
+    pub fn try_new(metadata: VariantMetadata<'m>, value: &'v [u8]) -> 
Result<Self, ArrowError> {
+        let header_byte = first_byte_from_slice(value)?;
+        let header = VariantObjectHeader::try_new(header_byte)?;
+
+        // Determine num_elements size based on is_large flag
+        let num_elements_size = if header.is_large {
+            OffsetSizeBytes::Four
+        } else {
+            OffsetSizeBytes::One
+        };
+
+        // Parse num_elements
+        let num_elements = num_elements_size.unpack_usize(value, 1, 0)?;
+
+        // Calculate byte offsets for different sections
+        let field_ids_start_byte = 1 + num_elements_size as usize;
+        let field_offsets_start_byte =
+            field_ids_start_byte + num_elements * header.field_id_size as 
usize;
+        let values_start_byte =
+            field_offsets_start_byte + (num_elements + 1) * 
header.field_offset_size as usize;
+
+        // Spec says: "The last field_offset points to the byte after the end 
of the last value"
+        //
+        // Use the last offset as a bounds check. The iterator check below 
doesn't use it -- offsets
+        // are not monotonic -- so we have to check separately here.
+        let last_field_offset =
+            header
+                .field_offset_size
+                .unpack_usize(value, field_offsets_start_byte, num_elements)?;
+        if values_start_byte + last_field_offset > value.len() {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Last field offset value {} at offset {} is outside the value 
slice of length {}",
+                last_field_offset,
+                values_start_byte,
+                value.len()
+            )));
+        }
+
+        let new_self = Self {
+            metadata,
+            value,
+            header,
+            num_elements,
+            field_ids_start_byte,
+            field_offsets_start_byte,
+            values_start_byte,
+        };
+
+        // Iterate over all fields of this object in order to validate the 
field_id and field_offset
+        // arrays, and also to prove the field values are all in bounds. 
Otherwise, `iter` might
+        // panic on `unwrap`.
+        validate_fallible_iterator(new_self.iter_checked())?;
+        Ok(new_self)
+    }
+
+    /// Returns the number of key-value pairs in this object
+    pub fn len(&self) -> usize {
+        self.num_elements
+    }
+
+    /// Returns true if the object contains no key-value pairs
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    /// Get a field's value by index in `0..self.len()`
+    pub fn field(&self, i: usize) -> Result<Variant<'m, 'v>, ArrowError> {
+        let start_offset = self.header.field_offset_size.unpack_usize(
+            self.value,
+            self.field_offsets_start_byte,
+            i,
+        )?;
+        let value_bytes = slice_from_slice(self.value, self.values_start_byte 
+ start_offset..)?;
+        Variant::try_new_with_metadata(self.metadata, value_bytes)
+    }
+
+    /// Get a field's name by index in `0..self.len()`
+    pub fn field_name(&self, i: usize) -> Result<&'m str, ArrowError> {
+        let field_id =
+            self.header
+                .field_id_size
+                .unpack_usize(self.value, self.field_ids_start_byte, i)?;
+        self.metadata.get(field_id)
+    }
+
+    /// Returns an iterator of (name, value) pairs over the fields of this 
object.
+    pub fn iter(&self) -> impl Iterator<Item = (&'m str, Variant<'m, 'v>)> + 
'_ {
+        // NOTE: It is safe to unwrap because the constructor already made a 
successful traversal.
+        self.iter_checked().map(Result::unwrap)
+    }
+
+    // Fallible iteration over the fields of this object. The constructor 
traverses the iterator to
+    // prove it has no errors, so that all other use sites can blindly 
`unwrap` the result.
+    fn iter_checked(
+        &self,
+    ) -> impl Iterator<Item = Result<(&'m str, Variant<'m, 'v>), ArrowError>> 
+ '_ {
+        (0..self.num_elements).map(move |i| Ok((self.field_name(i)?, 
self.field(i)?)))
+    }
+
+    /// Returns the value of the field with the specified name, if any.
+    ///
+    /// `Ok(None)` means the field does not exist; `Err` means the search 
encountered an error.
+    pub fn field_by_name(&self, name: &str) -> Result<Option<Variant<'m, 'v>>, 
ArrowError> {
+        // Binary search through the field IDs of this object to find the 
requested field name.
+        //
+        // NOTE: This does not require a sorted metadata dictionary, because 
the variant spec
+        // requires object field ids to be lexically sorted by their 
corresponding string values,
+        // and probing the dictionary for a field id is always O(1) work.
+        let search_result =
+            try_binary_search_range_by(0..self.num_elements, &name, |i| 
self.field_name(i))?;
+
+        search_result.ok().map(|i| self.field(i)).transpose()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_variant_object_simple() {
+        // Create metadata with field names: "age", "name", "active" (sorted)
+        // Header: version=1, sorted=1, offset_size=1 (offset_size_minus_one=0)
+        // So header byte = 00_0_1_0001 = 0x10
+        let metadata_bytes = vec![
+            0b0001_0001,
+            3, // dictionary size
+            0, // "active"
+            6, // "age"
+            9, // "name"
+            13,
+            b'a',
+            b'c',
+            b't',
+            b'i',
+            b'v',
+            b'e',
+            b'a',
+            b'g',
+            b'e',
+            b'n',
+            b'a',
+            b'm',
+            b'e',
+        ];
+        let metadata = VariantMetadata::try_new(&metadata_bytes).unwrap();
+
+        // Create object value data for: {"active": true, "age": 42, "name": 
"hello"}
+        // Field IDs in sorted order: [0, 1, 2] (active, age, name)
+        // Header: basic_type=2, field_offset_size_minus_one=0, 
field_id_size_minus_one=0, is_large=0
+        // value_header = 0000_00_00 = 0x00
+        // So header byte = (0x00 << 2) | 2 = 0x02
+        let object_value = vec![
+            0x02, // header: basic_type=2, value_header=0x00
+            3,    // num_elements = 3
+            // Field IDs (1 byte each): active=0, age=1, name=2
+            0, 1, 2,
+            // Field offsets (1 byte each): 4 offsets total
+            0, // offset to first value (boolean true)
+            1, // offset to second value (int8)
+            3, // offset to third value (short string)
+            9, // end offset
+            // Values:
+            0x04, // boolean true: primitive_header=1, basic_type=0 -> (1 << 
2) | 0 = 0x04
+            0x0C,
+            42, // int8: primitive_header=3, basic_type=0 -> (3 << 2) | 0 = 
0x0C, then value 42
+            0x15, b'h', b'e', b'l', b'l',
+            b'o', // short string: length=5, basic_type=1 -> (5 << 2) | 1 = 
0x15
+        ];
+
+        let variant_obj = VariantObject::try_new(metadata, 
&object_value).unwrap();
+
+        // Test basic properties
+        assert_eq!(variant_obj.len(), 3);
+        assert!(!variant_obj.is_empty());
+
+        // Test field access
+        let active_field = variant_obj.field_by_name("active").unwrap();
+        assert!(active_field.is_some());
+        assert_eq!(active_field.unwrap().as_boolean(), Some(true));
+
+        let age_field = variant_obj.field_by_name("age").unwrap();
+        assert!(age_field.is_some());
+        assert_eq!(age_field.unwrap().as_int8(), Some(42));
+
+        let name_field = variant_obj.field_by_name("name").unwrap();
+        assert!(name_field.is_some());
+        assert_eq!(name_field.unwrap().as_string(), Some("hello"));
+
+        // Test non-existent field
+        let missing_field = variant_obj.field_by_name("missing").unwrap();
+        assert!(missing_field.is_none());
+
+        // Test fields iterator
+        let fields: Vec<_> = variant_obj.iter().collect();
+        assert_eq!(fields.len(), 3);
+
+        // Fields should be in sorted order: active, age, name
+        assert_eq!(fields[0].0, "active");
+        assert_eq!(fields[0].1.as_boolean(), Some(true));
+
+        assert_eq!(fields[1].0, "age");
+        assert_eq!(fields[1].1.as_int8(), Some(42));
+
+        assert_eq!(fields[2].0, "name");
+        assert_eq!(fields[2].1.as_string(), Some("hello"));
+    }
+
+    #[test]
+    fn test_variant_object_empty() {
+        // Create metadata with no fields
+        let metadata_bytes = vec![
+            0x11, // header: version=1, sorted=0, offset_size_minus_one=0
+            0,    // dictionary_size = 0
+            0,    // offset[0] = 0 (end of dictionary)
+        ];
+        let metadata = VariantMetadata::try_new(&metadata_bytes).unwrap();
+
+        // Create empty object value data: {}
+        let object_value = vec![
+            0x02, // header: basic_type=2, value_header=0x00
+            0,    // num_elements = 0
+            0,    // single offset pointing to end
+                  // No field IDs, no values
+        ];
+
+        let variant_obj = VariantObject::try_new(metadata, 
&object_value).unwrap();
+
+        // Test basic properties
+        assert_eq!(variant_obj.len(), 0);
+        assert!(variant_obj.is_empty());
+
+        // Test field access on empty object
+        let missing_field = variant_obj.field_by_name("anything").unwrap();
+        assert!(missing_field.is_none());
+
+        // Test fields iterator on empty object
+        let fields: Vec<_> = variant_obj.iter().collect();
+        assert_eq!(fields.len(), 0);
+    }
+}

Reply via email to