This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 67c9f9bb83 fix: Resolve Avro RecordEncoder bugs related to nullable 
Struct fields and Union type ids (#8935)
67c9f9bb83 is described below

commit 67c9f9bb8351d4fefd65603079a2f5f21d5a3087
Author: Connor Sanders <[email protected]>
AuthorDate: Thu Dec 11 13:15:45 2025 -0600

    fix: Resolve Avro RecordEncoder bugs related to nullable Struct fields and 
Union type ids (#8935)
    
    # Which issue does this PR close?
    
    - Closes https://github.com/apache/arrow-rs/issues/8934
    
    # Rationale for this change
    
    The `arrow-avro` writer currently fails on two classes of *valid* Arrow
    inputs:
    
    1. **Nullable `Struct` with non‑nullable children + row‑wise sliced
    encoding**
    
    When encoding a `RecordBatch` row‑by‑row, a nullable `Struct` field
    whose child is non‑nullable can cause the writer to error with `Invalid
    argument error: Avro site '{field}' is non-nullable, but array contains
    nulls`, even when the parent `Struct` is null at that row and the child
    value should be ignored.
    
    2. **Dense `UnionArray` with non‑zero, non‑consecutive type ids**
    
    A dense `UnionArray` whose `UnionFields` use type ids such as `2` and
    `5` will currently fail with a `SchemaError("Binding and field
    mismatch")`, even though this layout is valid per Arrow’s union
    semantics.
    
    This PR updates the `RecordEncoder` to resolve both of these issues and
    better respect Arrow’s struct/union semantics.
    
    # What changes are included in this PR?
    
    This PR touches only the `arrow-avro` writer implementation,
    specifically `arrow-avro/src/writer/encoder.rs` and
    `arrow-avro/src/writer/mod.rs`.
    
    **1. Fix nullable struct + non‑nullable child handling**
    
    * Adjusts the `RecordEncoder` / `StructEncoder` path so that **child
    field null validation is masked by the parent `Struct`’s null bitmap**.
    * For rows where the parent `Struct` value is null, the encoder now
    **skips encoding the non‑nullable children** for that row, instead of
    treating any child‑side nulls as a violation of the Avro site’s
    nullability.
    * This ensures that row‑wise encoding of a sliced `RecordBatch`, like
    the one in the issue’s reproducing test, now succeeds without triggering
    `Invalid argument error: Avro site '{field}' is non-nullable, but array
    contains nulls`.
    
    **2. Support dense unions with non‑zero, non‑consecutive type ids**
    
    * Updates the union encoding path (`UnionEncoder`) so that it no longer
    assumes Arrow dense union type IDs are `0..N-1`.
    * The encoder now **builds an explicit mapping from Arrow `type_ids` (as
    declared in `UnionFields`) to Avro union branch indices**, and uses this
    mapping when:
      * constructing the union’s Avro schema binding, and
      * writing out the branch index and value for each union element.
    * As a result, dense unions with type ids such as `2` and `5` now encode
    successfully, matching Arrow’s semantics that only require type ids to
    be consistent with `UnionFields`, not only contiguous and/or zero‑based.
    
    **3. Regression tests for both bugs**
    
    Adds targeted regression tests under `arrow-avro/src/writer/mod.rs`’s
    test module to validate the fixes:
    
    1. **`test_nullable_struct_with_nonnullable_field_sliced_encoding`**
    * Builds the nullable `Struct` + non‑nullable child scenario from the
    issue.
    * Encodes the `RecordBatch` one row at a time via
    
`WriterBuilder::new(schema).with_fingerprint_strategy(FingerprintStrategy::Id(1)).build::<_,
    AvroSoeFormat>(...)` and asserts all rows encode successfully.
    2. **`test_nullable_struct_with_decimal_and_timestamp_sliced`**
    * Constructs a `RecordBatch` containing nullable `Struct` fields
    populated with `Decimal128` and `TimestampMicrosecond` types to verify
    encoding of complex nested data.
    * Encodes the `RecordBatch` one row at a time using `AvroSoeFormat` and
    `FingerprintStrategy::Id(1)`, asserting that each sliced row encodes
    successfully.
    3. **`non_nullable_child_in_nullable_struct_should_encode_per_row`**
    * Builds a test case with a nullable `Struct` column containing a
    non-nullable child field, alongside a timestamp column.
    * Slices a single row from the batch and writes it via `AvroSoeFormat`,
    asserting that `writer.write` returns `Ok` to confirm the fix for sliced
    encoding constraints.
    4. **`test_union_nonzero_type_ids`**
    * Constructs a dense `UnionArray` whose `UnionFields` use type ids `[2,
    5]` and a mix of string/int values.
    * Encodes via `AvroWriter` and asserts that writing and finishing the
    writer both succeed without error.
    
    Together these tests reproduce the failures described in #8934 and
    confirm that the new encoder behavior handles them correctly.
    
    # Are these changes tested?
    
    Yes.
    
    * New unit tests are added for both regression scenarios (nullable
    struct + non‑nullable child, and dense union with non‑zero &
    non‑consecutive type ids).
    * Existing writer / reader integration tests (round‑trip tests, nested
    record tests, etc.) continue to pass unchanged, ensuring that the
    crate’s previously tested behavior / public API remains intact without
    breaking changes.
    
    # Are there any user-facing changes?
    
    1. **Behavioral change (bug fix):**
    * Previously, valid and supported Arrow inputs could cause the Avro
    writer to error or panic in the two scenarios described above.
    * After this change, those inputs encode successfully and produce Avro
    output consistent with the generated or provided Avro schema.
    2. **APIs and configuration:**
    * No public APIs, types, or configuration options are added, removed, or
    renamed.
    * The on‑wire Avro representation for already‑supported layouts is
    unchanged; the encoder simply now accepts valid Arrow layouts that were
    failing prior.
    
    The change is strictly a non-breaking backwards compatible bug fix that
    makes the `arrow-avro` writer function as expected.
---
 arrow-avro/src/writer/encoder.rs | 391 +++++++++++++++++----------------------
 arrow-avro/src/writer/mod.rs     | 235 ++++++++++++++++++++++-
 2 files changed, 399 insertions(+), 227 deletions(-)

diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs
index 79aee4fae0..c638c2b73f 100644
--- a/arrow-avro/src/writer/encoder.rs
+++ b/arrow-avro/src/writer/encoder.rs
@@ -205,13 +205,13 @@ fn write_optional_index<W: Write + ?Sized>(
 }
 
 #[derive(Debug, Clone)]
-enum NullState {
+enum NullState<'a> {
     NonNullable,
     NullableNoNulls {
         union_value_byte: u8,
     },
     Nullable {
-        nulls: NullBuffer,
+        nulls: &'a NullBuffer,
         null_order: Nullability,
     },
 }
@@ -221,13 +221,12 @@ enum NullState {
 /// - Carries the per-site nullability **state** as a single enum that 
enforces invariants
 pub(crate) struct FieldEncoder<'a> {
     encoder: Encoder<'a>,
-    null_state: NullState,
+    null_state: NullState<'a>,
 }
 
 impl<'a> FieldEncoder<'a> {
     fn make_encoder(
         array: &'a dyn Array,
-        field: &Field,
         plan: &FieldPlan,
         nullability: Option<Nullability>,
     ) -> Result<Self, ArrowError> {
@@ -563,61 +562,48 @@ impl<'a> FieldEncoder<'a> {
                     .as_any()
                     .downcast_ref::<UnionArray>()
                     .ok_or_else(|| ArrowError::SchemaError("Expected 
UnionArray".into()))?;
-
                 Encoder::Union(Box::new(UnionEncoder::try_new(arr, bindings)?))
             }
             FieldPlan::RunEndEncoded {
                 values_nullability,
                 value_plan,
             } => {
-                let dt = array.data_type();
-                let values_field = match dt {
-                    DataType::RunEndEncoded(_re_field, v_field) => 
v_field.as_ref(),
-                    other => {
-                        return Err(ArrowError::SchemaError(format!(
-                            "Avro RunEndEncoded site requires Arrow 
DataType::RunEndEncoded, found: {other:?}"
-                        )));
-                    }
-                };
                 // Helper closure to build a typed RunEncodedEncoder<R>
                 let build = |run_arr_any: &'a dyn Array| -> 
Result<Encoder<'a>, ArrowError> {
                     if let Some(arr) = 
run_arr_any.as_any().downcast_ref::<RunArray<Int16Type>>() {
-                        let values_enc = prepare_value_site_encoder(
-                            arr.values().as_ref(),
-                            values_field,
-                            *values_nullability,
-                            value_plan.as_ref(),
-                        )?;
                         return 
Ok(Encoder::RunEncoded16(Box::new(RunEncodedEncoder::<
                             Int16Type,
                         >::new(
-                            arr, values_enc
+                            arr,
+                            FieldEncoder::make_encoder(
+                                arr.values().as_ref(),
+                                value_plan.as_ref(),
+                                *values_nullability,
+                            )?,
                         ))));
                     }
                     if let Some(arr) = 
run_arr_any.as_any().downcast_ref::<RunArray<Int32Type>>() {
-                        let values_enc = prepare_value_site_encoder(
-                            arr.values().as_ref(),
-                            values_field,
-                            *values_nullability,
-                            value_plan.as_ref(),
-                        )?;
                         return 
Ok(Encoder::RunEncoded32(Box::new(RunEncodedEncoder::<
                             Int32Type,
                         >::new(
-                            arr, values_enc
+                            arr,
+                            FieldEncoder::make_encoder(
+                                arr.values().as_ref(),
+                                value_plan.as_ref(),
+                                *values_nullability,
+                            )?,
                         ))));
                     }
                     if let Some(arr) = 
run_arr_any.as_any().downcast_ref::<RunArray<Int64Type>>() {
-                        let values_enc = prepare_value_site_encoder(
-                            arr.values().as_ref(),
-                            values_field,
-                            *values_nullability,
-                            value_plan.as_ref(),
-                        )?;
                         return 
Ok(Encoder::RunEncoded64(Box::new(RunEncodedEncoder::<
                             Int64Type,
                         >::new(
-                            arr, values_enc
+                            arr,
+                            FieldEncoder::make_encoder(
+                                arr.values().as_ref(),
+                                value_plan.as_ref(),
+                                *values_nullability,
+                            )?,
                         ))));
                     }
                     Err(ArrowError::SchemaError(
@@ -629,29 +615,19 @@ impl<'a> FieldEncoder<'a> {
             }
         };
         // Compute the effective null state from writer-declared nullability 
and data nulls.
-        let null_state = match (nullability, array.null_count() > 0) {
-            (None, false) => NullState::NonNullable,
-            (None, true) => {
-                return Err(ArrowError::InvalidArgumentError(format!(
-                    "Avro site '{}' is non-nullable, but array contains nulls",
-                    field.name()
-                )));
-            }
-            (Some(order), false) => {
-                // Optimization: drop any bitmap; emit a constant "value" 
branch byte.
-                NullState::NullableNoNulls {
-                    union_value_byte: union_value_branch_byte(order, false),
+        let null_state = match nullability {
+            None => NullState::NonNullable,
+            Some(null_order) => {
+                match array.nulls() {
+                    Some(nulls) if array.null_count() > 0 => {
+                        NullState::Nullable { nulls, null_order }
+                    }
+                    _ => NullState::NullableNoNulls {
+                        // Nullable site with no null buffer for this view
+                        union_value_byte: union_value_branch_byte(null_order, 
false),
+                    },
                 }
             }
-            (Some(null_order), true) => {
-                let Some(nulls) = array.nulls().cloned() else {
-                    return Err(ArrowError::InvalidArgumentError(format!(
-                        "Array for Avro site '{}' reports nulls but has no 
null buffer",
-                        field.name()
-                    )));
-                };
-                NullState::Nullable { nulls, null_order }
-            }
         };
         Ok(Self {
             encoder,
@@ -797,8 +773,6 @@ impl RecordEncoder {
         &'a self,
         batch: &'a RecordBatch,
     ) -> Result<Vec<FieldEncoder<'a>>, ArrowError> {
-        let schema_binding = batch.schema();
-        let fields = schema_binding.fields();
         let arrays = batch.columns();
         let mut out = Vec::with_capacity(self.columns.len());
         for col_plan in self.columns.iter() {
@@ -806,7 +780,6 @@ impl RecordEncoder {
             let array = arrays.get(arrow_index).ok_or_else(|| {
                 ArrowError::SchemaError(format!("Column index {arrow_index} 
out of range"))
             })?;
-            let field = fields[arrow_index].as_ref();
             #[cfg(not(feature = "avro_custom_types"))]
             let site_nullability = match &col_plan.plan {
                 FieldPlan::RunEndEncoded { .. } => None,
@@ -814,13 +787,11 @@ impl RecordEncoder {
             };
             #[cfg(feature = "avro_custom_types")]
             let site_nullability = col_plan.nullability;
-            let encoder = prepare_value_site_encoder(
+            out.push(FieldEncoder::make_encoder(
                 array.as_ref(),
-                field,
-                site_nullability,
                 &col_plan.plan,
-            )?;
-            out.push(encoder);
+                site_nullability,
+            )?);
         }
         Ok(out)
     }
@@ -1348,39 +1319,14 @@ impl<'a> MapEncoder<'a> {
                 )));
             }
         };
-
-        let entries_struct_fields = match map.data_type() {
-            DataType::Map(entries, _) => match entries.data_type() {
-                DataType::Struct(fs) => fs,
-                other => {
-                    return Err(ArrowError::SchemaError(format!(
-                        "Arrow Map entries must be Struct, found: {other:?}"
-                    )));
-                }
-            },
-            _ => {
-                return Err(ArrowError::SchemaError(
-                    "Expected MapArray with DataType::Map".into(),
-                ));
-            }
-        };
-
-        let v_idx = 
find_map_value_field_index(entries_struct_fields).ok_or_else(|| {
-            ArrowError::SchemaError("Map entries struct missing value 
field".into())
-        })?;
-        let value_field = entries_struct_fields[v_idx].as_ref();
-
-        let values_enc = prepare_value_site_encoder(
-            map.values().as_ref(),
-            value_field,
-            values_nullability,
-            value_plan,
-        )?;
-
         Ok(Self {
             map,
             keys: keys_kind,
-            values: values_enc,
+            values: FieldEncoder::make_encoder(
+                map.values().as_ref(),
+                value_plan,
+                values_nullability,
+            )?,
             keys_offset: keys_arr.offset(),
             values_offset: map.values().offset(),
         })
@@ -1452,6 +1398,7 @@ impl EnumEncoder<'_> {
 struct UnionEncoder<'a> {
     encoders: Vec<FieldEncoder<'a>>,
     array: &'a UnionArray,
+    type_id_to_encoder_index: Vec<Option<usize>>,
 }
 
 impl<'a> UnionEncoder<'a> {
@@ -1459,7 +1406,6 @@ impl<'a> UnionEncoder<'a> {
         let DataType::Union(fields, UnionMode::Dense) = array.data_type() else 
{
             return Err(ArrowError::SchemaError("Expected Dense 
UnionArray".into()));
         };
-
         if fields.len() != field_bindings.len() {
             return Err(ArrowError::SchemaError(format!(
                 "Mismatched number of union branches between Arrow array ({}) 
and encoding plan ({})",
@@ -1467,37 +1413,44 @@ impl<'a> UnionEncoder<'a> {
                 field_bindings.len()
             )));
         }
+        let max_type_id = fields.iter().map(|(tid, _)| tid).max().unwrap_or(0);
+        let mut type_id_to_encoder_index: Vec<Option<usize>> =
+            vec![None; (max_type_id + 1) as usize];
         let mut encoders = Vec::with_capacity(fields.len());
-        for (type_id, field_ref) in fields.iter() {
+        for (i, (type_id, _)) in fields.iter().enumerate() {
             let binding = field_bindings
-                .get(type_id as usize)
+                .get(i)
                 .ok_or_else(|| ArrowError::SchemaError("Binding and field 
mismatch".to_string()))?;
-
-            let child = array.child(type_id).as_ref();
-
-            let encoder = prepare_value_site_encoder(
-                child,
-                field_ref.as_ref(),
-                binding.nullability,
+            encoders.push(FieldEncoder::make_encoder(
+                array.child(type_id).as_ref(),
                 &binding.plan,
-            )?;
-            encoders.push(encoder);
+                binding.nullability,
+            )?);
+            type_id_to_encoder_index[type_id as usize] = Some(i);
         }
-        Ok(Self { encoders, array })
+        Ok(Self {
+            encoders,
+            array,
+            type_id_to_encoder_index,
+        })
     }
 
     fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) -> 
Result<(), ArrowError> {
+        // SAFETY: `idx` is always in bounds because:
+        // 1. The encoder is called from `RecordEncoder::encode,` which 
iterates over `0..batch.num_rows()`
+        // 2. `self.array` is a column from the same batch, so its length 
equals `batch.num_rows()`
+        // 3. `type_ids()` returns a buffer with exactly `self.array.len()` 
entries (one per logical element)
         let type_id = self.array.type_ids()[idx];
-        let branch_index = type_id as usize;
-        write_int(out, type_id as i32)?;
-        let child_row = self.array.value_offset(idx);
-
-        let encoder = self
-            .encoders
-            .get_mut(branch_index)
+        let encoder_index = self
+            .type_id_to_encoder_index
+            .get(type_id as usize)
+            .and_then(|opt| *opt)
             .ok_or_else(|| ArrowError::SchemaError(format!("Invalid type_id 
{type_id}")))?;
-
-        encoder.encode(out, child_row)
+        write_int(out, encoder_index as i32)?;
+        let encoder = self.encoders.get_mut(encoder_index).ok_or_else(|| {
+            ArrowError::SchemaError(format!("Invalid encoder index 
{encoder_index}"))
+        })?;
+        encoder.encode(out, self.array.value_offset(idx))
     }
 }
 
@@ -1510,23 +1463,16 @@ impl<'a> StructEncoder<'a> {
         array: &'a StructArray,
         field_bindings: &[FieldBinding],
     ) -> Result<Self, ArrowError> {
-        let DataType::Struct(fields) = array.data_type() else {
-            return Err(ArrowError::SchemaError("Expected Struct".into()));
-        };
         let mut encoders = Vec::with_capacity(field_bindings.len());
         for field_binding in field_bindings {
             let idx = field_binding.arrow_index;
             let column = array.columns().get(idx).ok_or_else(|| {
                 ArrowError::SchemaError(format!("Struct child index {idx} out 
of range"))
             })?;
-            let field = fields.get(idx).ok_or_else(|| {
-                ArrowError::SchemaError(format!("Struct child index {idx} out 
of range"))
-            })?;
-            let encoder = prepare_value_site_encoder(
+            let encoder = FieldEncoder::make_encoder(
                 column.as_ref(),
-                field,
-                field_binding.nullability,
                 &field_binding.plan,
+                field_binding.nullability,
             )?;
             encoders.push(encoder);
         }
@@ -1583,24 +1529,13 @@ impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
         items_nullability: Option<Nullability>,
         item_plan: &FieldPlan,
     ) -> Result<Self, ArrowError> {
-        let child_field = match list.data_type() {
-            DataType::List(field) => field.as_ref(),
-            DataType::LargeList(field) => field.as_ref(),
-            _ => {
-                return Err(ArrowError::SchemaError(
-                    "Expected List or LargeList for ListEncoder".into(),
-                ));
-            }
-        };
-        let values_enc = prepare_value_site_encoder(
-            list.values().as_ref(),
-            child_field,
-            items_nullability,
-            item_plan,
-        )?;
         Ok(Self {
             list,
-            values: values_enc,
+            values: FieldEncoder::make_encoder(
+                list.values().as_ref(),
+                item_plan,
+                items_nullability,
+            )?,
             values_offset: list.values().offset(),
         })
     }
@@ -1647,24 +1582,13 @@ impl<'a, O: OffsetSizeTrait> ListViewEncoder<'a, O> {
         items_nullability: Option<Nullability>,
         item_plan: &FieldPlan,
     ) -> Result<Self, ArrowError> {
-        let child_field = match list.data_type() {
-            DataType::ListView(field) => field.as_ref(),
-            DataType::LargeListView(field) => field.as_ref(),
-            _ => {
-                return Err(ArrowError::SchemaError(
-                    "Expected ListView or LargeListView for 
ListViewEncoder".into(),
-                ));
-            }
-        };
-        let values_enc = prepare_value_site_encoder(
-            list.values().as_ref(),
-            child_field,
-            items_nullability,
-            item_plan,
-        )?;
         Ok(Self {
             list,
-            values: values_enc,
+            values: FieldEncoder::make_encoder(
+                list.values().as_ref(),
+                item_plan,
+                items_nullability,
+            )?,
             values_offset: list.values().offset(),
         })
     }
@@ -1701,23 +1625,13 @@ impl<'a> FixedSizeListEncoder<'a> {
         items_nullability: Option<Nullability>,
         item_plan: &FieldPlan,
     ) -> Result<Self, ArrowError> {
-        let child_field = match list.data_type() {
-            DataType::FixedSizeList(field, _len) => field.as_ref(),
-            _ => {
-                return Err(ArrowError::SchemaError(
-                    "Expected FixedSizeList for FixedSizeListEncoder".into(),
-                ));
-            }
-        };
-        let values_enc = prepare_value_site_encoder(
-            list.values().as_ref(),
-            child_field,
-            items_nullability,
-            item_plan,
-        )?;
         Ok(Self {
             list,
-            values: values_enc,
+            values: FieldEncoder::make_encoder(
+                list.values().as_ref(),
+                item_plan,
+                items_nullability,
+            )?,
             values_offset: list.values().offset(),
             elem_len: list.value_length() as usize,
         })
@@ -1735,16 +1649,6 @@ impl<'a> FixedSizeListEncoder<'a> {
     }
 }
 
-fn prepare_value_site_encoder<'a>(
-    values_array: &'a dyn Array,
-    value_field: &Field,
-    nullability: Option<Nullability>,
-    plan: &FieldPlan,
-) -> Result<FieldEncoder<'a>, ArrowError> {
-    // Effective nullability is computed here from the writer-declared site 
nullability and data.
-    FieldEncoder::make_encoder(values_array, value_field, plan, nullability)
-}
-
 /// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`.
 /// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix.
 struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
@@ -2049,8 +1953,7 @@ mod tests {
         plan: &FieldPlan,
         nullability: Option<Nullability>,
     ) -> Vec<u8> {
-        let field = Field::new("f", array.data_type().clone(), true);
-        let mut enc = FieldEncoder::make_encoder(array, &field, plan, 
nullability).unwrap();
+        let mut enc = FieldEncoder::make_encoder(array, plan, 
nullability).unwrap();
         let mut out = Vec::new();
         for i in 0..array.len() {
             enc.encode(&mut out, i).unwrap();
@@ -2381,9 +2284,7 @@ mod tests {
             FixedSizeBinaryArray::try_new(10, 
arrow_buffer::Buffer::from(vec![0u8; 10]), None)
                 .unwrap();
         let plan = FieldPlan::Uuid;
-
-        let field = Field::new("f", arr.data_type().clone(), true);
-        let mut enc = FieldEncoder::make_encoder(&arr, &field, &plan, 
None).unwrap();
+        let mut enc = FieldEncoder::make_encoder(&arr, &plan, None).unwrap();
         let mut out = Vec::new();
         let err = enc.encode(&mut out, 0).unwrap_err();
         match err {
@@ -2749,8 +2650,7 @@ mod tests {
     #[test]
     fn duration_encoder_year_month_rejects_negative() {
         let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
-        let field = Field::new("f", 
DataType::Interval(IntervalUnit::YearMonth), true);
-        let mut enc = FieldEncoder::make_encoder(&arr, &field, 
&FieldPlan::Scalar, None).unwrap();
+        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, 
None).unwrap();
         let mut out = Vec::new();
         let err = enc.encode(&mut out, 0).unwrap_err();
         match err {
@@ -2777,8 +2677,7 @@ mod tests {
     fn duration_encoder_day_time_rejects_negative() {
         let bad = IntervalDayTimeType::make_value(-1, 0);
         let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
-        let field = Field::new("f", DataType::Interval(IntervalUnit::DayTime), 
true);
-        let mut enc = FieldEncoder::make_encoder(&arr, &field, 
&FieldPlan::Scalar, None).unwrap();
+        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, 
None).unwrap();
         let mut out = Vec::new();
         let err = enc.encode(&mut out, 0).unwrap_err();
         match err {
@@ -2805,8 +2704,7 @@ mod tests {
     fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
         let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
         let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
-        let field = Field::new("f", 
DataType::Interval(IntervalUnit::MonthDayNano), true);
-        let mut enc = FieldEncoder::make_encoder(&arr, &field, 
&FieldPlan::Scalar, None).unwrap();
+        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, 
None).unwrap();
         let mut out = Vec::new();
         let err = enc.encode(&mut out, 0).unwrap_err();
         match err {
@@ -2854,8 +2752,7 @@ mod tests {
         let nanos = ((u64::from(u32::MAX) + 1) * 1_000_000) as i64;
         let v = IntervalMonthDayNanoType::make_value(0, 0, nanos);
         let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v].into();
-        let field = Field::new("f", 
DataType::Interval(IntervalUnit::MonthDayNano), true);
-        let mut enc = FieldEncoder::make_encoder(&arr, &field, 
&FieldPlan::Scalar, None).unwrap();
+        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, 
None).unwrap();
         let mut out = Vec::new();
         let err = enc.encode(&mut out, 0).unwrap_err();
         match err {
@@ -3007,9 +2904,7 @@ mod tests {
         // Time32(Second) must encode as Avro time-millis (ms since midnight).
         let arr: 
arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
             vec![0i32, 1, -2, 12_345].into();
-
         let got = encode_all(&arr, &FieldPlan::Scalar, None);
-
         let mut expected = Vec::new();
         for secs in [0i32, 1, -2, 12_345] {
             let millis = (secs as i64) * 1000;
@@ -3022,16 +2917,8 @@ mod tests {
     fn time32_seconds_to_millis_overflow() {
         // Choose a value that will overflow i32 when multiplied by 1000.
         let overflow_secs: i32 = i32::MAX / 1000 + 1;
-        let arr: 
arrow_array::PrimitiveArray<arrow_array::types::Time32SecondType> =
-            vec![overflow_secs].into();
-
-        let field = arrow_schema::Field::new(
-            "f",
-            arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second),
-            true,
-        );
-        let mut enc = FieldEncoder::make_encoder(&arr, &field, 
&FieldPlan::Scalar, None).unwrap();
-
+        let arr: PrimitiveArray<Time32SecondType> = vec![overflow_secs].into();
+        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, 
None).unwrap();
         let mut out = Vec::new();
         let err = enc.encode(&mut out, 0).unwrap_err();
         match err {
@@ -3048,11 +2935,8 @@ mod tests {
     #[test]
     fn timestamp_seconds_to_millis_encoder() {
         // Timestamp(Second) must encode as Avro timestamp-millis (ms since 
epoch).
-        let arr: 
arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
-            vec![0i64, 1, -1, 1_234_567_890].into();
-
+        let arr: PrimitiveArray<TimestampSecondType> = vec![0i64, 1, -1, 
1_234_567_890].into();
         let got = encode_all(&arr, &FieldPlan::Scalar, None);
-
         let mut expected = Vec::new();
         for secs in [0i64, 1, -1, 1_234_567_890] {
             let millis = secs * 1000;
@@ -3065,16 +2949,8 @@ mod tests {
     fn timestamp_seconds_to_millis_overflow() {
         // Overflow i64 when multiplied by 1000.
         let overflow_secs: i64 = i64::MAX / 1000 + 1;
-        let arr: 
arrow_array::PrimitiveArray<arrow_array::types::TimestampSecondType> =
-            vec![overflow_secs].into();
-
-        let field = arrow_schema::Field::new(
-            "f",
-            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, 
None),
-            true,
-        );
-        let mut enc = FieldEncoder::make_encoder(&arr, &field, 
&FieldPlan::Scalar, None).unwrap();
-
+        let arr: PrimitiveArray<TimestampSecondType> = 
vec![overflow_secs].into();
+        let mut enc = FieldEncoder::make_encoder(&arr, &FieldPlan::Scalar, 
None).unwrap();
         let mut out = Vec::new();
         let err = enc.encode(&mut out, 0).unwrap_err();
         match err {
@@ -3090,15 +2966,80 @@ mod tests {
 
     #[test]
     fn timestamp_nanos_encoder() {
-        let arr: 
arrow_array::PrimitiveArray<arrow_array::types::TimestampNanosecondType> =
-            vec![0i64, 1, -1, 123].into();
-
+        let arr: PrimitiveArray<TimestampNanosecondType> = vec![0i64, 1, -1, 
123].into();
         let got = encode_all(&arr, &FieldPlan::Scalar, None);
-
         let mut expected = Vec::new();
         for ns in [0i64, 1, -1, 123] {
             expected.extend_from_slice(&avro_long_bytes(ns));
         }
         assert_bytes_eq(&got, &expected);
     }
+
+    #[test]
+    fn union_encoder_string_int_nonzero_type_ids() {
+        let strings = StringArray::from(vec!["hello", "world"]);
+        let ints = Int32Array::from(vec![10, 20, 30]);
+        let union_fields = UnionFields::new(
+            vec![2, 5],
+            vec![
+                Field::new("v_str", DataType::Utf8, true),
+                Field::new("v_int", DataType::Int32, true),
+            ],
+        );
+        let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
+        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
+        let union_array = UnionArray::try_new(
+            union_fields,
+            type_ids.into(),
+            Some(offsets.into()),
+            vec![Arc::new(strings), Arc::new(ints)],
+        )
+        .unwrap();
+        let plan = FieldPlan::Union {
+            bindings: vec![
+                FieldBinding {
+                    arrow_index: 0,
+                    nullability: None,
+                    plan: FieldPlan::Scalar,
+                },
+                FieldBinding {
+                    arrow_index: 1,
+                    nullability: None,
+                    plan: FieldPlan::Scalar,
+                },
+            ],
+        };
+        let got = encode_all(&union_array, &plan, None);
+        let mut expected = Vec::new();
+        expected.extend(avro_long_bytes(0));
+        expected.extend(avro_len_prefixed_bytes(b"hello"));
+        expected.extend(avro_long_bytes(1));
+        expected.extend(avro_long_bytes(10));
+        expected.extend(avro_long_bytes(1));
+        expected.extend(avro_long_bytes(20));
+        expected.extend(avro_long_bytes(0));
+        expected.extend(avro_len_prefixed_bytes(b"world"));
+        expected.extend(avro_long_bytes(1));
+        expected.extend(avro_long_bytes(30));
+        assert_bytes_eq(&got, &expected);
+    }
+
+    #[test]
+    fn nullable_state_with_null_buffer_and_zero_nulls() {
+        let values = vec![1i32, 2, 3];
+        let arr = Int32Array::from_iter_values_with_nulls(values, 
Some(NullBuffer::new_valid(3)));
+        assert_eq!(arr.null_count(), 0);
+        assert!(arr.nulls().is_some());
+        let plan = FieldPlan::Scalar;
+        let enc = FieldEncoder::make_encoder(&arr, &plan, 
Some(Nullability::NullFirst)).unwrap();
+        match enc.null_state {
+            NullState::NullableNoNulls { union_value_byte } => {
+                assert_eq!(
+                    union_value_byte,
+                    union_value_branch_byte(Nullability::NullFirst, false)
+                );
+            }
+            other => panic!("expected NullableNoNulls, got {other:?}"),
+        }
+    }
 }
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index 231c9846f9..9b3eea1d6f 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -409,10 +409,11 @@ mod tests {
     };
     use arrow_array::{
         Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray, 
RecordBatch,
-        StructArray, UnionArray,
+        StringArray, StructArray, UnionArray,
     };
     #[cfg(feature = "avro_custom_types")]
-    use arrow_array::{Int16Array, Int64Array, RunArray, StringArray};
+    use arrow_array::{Int16Array, Int64Array, RunArray};
+    use arrow_schema::UnionMode;
     #[cfg(not(feature = "avro_custom_types"))]
     use arrow_schema::{DataType, Field, Schema};
     #[cfg(feature = "avro_custom_types")]
@@ -491,6 +492,236 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_nullable_struct_with_nonnullable_field_sliced_encoding() {
+        use arrow_array::{ArrayRef, Int32Array, StringArray, StructArray};
+        use arrow_buffer::NullBuffer;
+        use arrow_schema::{DataType, Field, Fields, Schema};
+        use std::sync::Arc;
+        let inner_fields = Fields::from(vec![
+            Field::new("id", DataType::Int32, false), // non-nullable
+            Field::new("name", DataType::Utf8, true), // nullable
+        ]);
+        let inner_struct_type = DataType::Struct(inner_fields.clone());
+        let schema = Schema::new(vec![
+            Field::new("before", inner_struct_type.clone(), true), // nullable 
struct
+            Field::new("after", inner_struct_type.clone(), true),  // nullable 
struct
+            Field::new("op", DataType::Utf8, false),               // 
non-nullable
+        ]);
+        let before_ids = Int32Array::from(vec![None, None]);
+        let before_names = StringArray::from(vec![None::<&str>, None]);
+        let before_struct = StructArray::new(
+            inner_fields.clone(),
+            vec![
+                Arc::new(before_ids) as ArrayRef,
+                Arc::new(before_names) as ArrayRef,
+            ],
+            Some(NullBuffer::from(vec![false, false])),
+        );
+        let after_ids = Int32Array::from(vec![1, 2]); // non-nullable, no nulls
+        let after_names = StringArray::from(vec![Some("Alice"), Some("Bob")]);
+        let after_struct = StructArray::new(
+            inner_fields.clone(),
+            vec![
+                Arc::new(after_ids) as ArrayRef,
+                Arc::new(after_names) as ArrayRef,
+            ],
+            Some(NullBuffer::from(vec![true, true])),
+        );
+        let op_col = StringArray::from(vec!["r", "r"]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(before_struct) as ArrayRef,
+                Arc::new(after_struct) as ArrayRef,
+                Arc::new(op_col) as ArrayRef,
+            ],
+        )
+        .expect("failed to create test batch");
+        let mut sink = Vec::new();
+        let mut writer = WriterBuilder::new(schema)
+            .with_fingerprint_strategy(FingerprintStrategy::Id(1))
+            .build::<_, AvroSoeFormat>(&mut sink)
+            .expect("failed to create writer");
+        for row_idx in 0..batch.num_rows() {
+            let single_row = batch.slice(row_idx, 1);
+            let after_col = single_row.column(1);
+            assert_eq!(
+                after_col.null_count(),
+                0,
+                "after column should have no nulls in sliced row"
+            );
+            writer
+                .write(&single_row)
+                .unwrap_or_else(|e| panic!("Failed to encode row {row_idx}: 
{e}"));
+        }
+        writer.finish().expect("failed to finish writer");
+        assert!(!sink.is_empty(), "encoded output should not be empty");
+    }
+
+    #[test]
+    fn test_nullable_struct_with_decimal_and_timestamp_sliced() {
+        use arrow_array::{
+            ArrayRef, Decimal128Array, Int32Array, StringArray, StructArray,
+            TimestampMicrosecondArray,
+        };
+        use arrow_buffer::NullBuffer;
+        use arrow_schema::{DataType, Field, Fields, Schema};
+        use std::sync::Arc;
+        let row_fields = Fields::from(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+            Field::new("category", DataType::Utf8, true),
+            Field::new("price", DataType::Decimal128(10, 2), true),
+            Field::new("stock_quantity", DataType::Int32, true),
+            Field::new(
+                "created_at",
+                DataType::Timestamp(TimeUnit::Microsecond, None),
+                true,
+            ),
+        ]);
+        let row_struct_type = DataType::Struct(row_fields.clone());
+        let schema = Schema::new(vec![
+            Field::new("before", row_struct_type.clone(), true),
+            Field::new("after", row_struct_type.clone(), true),
+            Field::new("op", DataType::Utf8, false),
+        ]);
+        let before_struct = StructArray::new_null(row_fields.clone(), 2);
+        let ids = Int32Array::from(vec![1, 2]);
+        let names = StringArray::from(vec![Some("Widget"), Some("Gadget")]);
+        let categories = StringArray::from(vec![Some("Electronics"), 
Some("Electronics")]);
+        let prices = Decimal128Array::from(vec![Some(1999), Some(2999)])
+            .with_precision_and_scale(10, 2)
+            .unwrap();
+        let quantities = Int32Array::from(vec![Some(100), Some(50)]);
+        let timestamps = TimestampMicrosecondArray::from(vec![
+            Some(1700000000000000i64),
+            Some(1700000001000000i64),
+        ]);
+        let after_struct = StructArray::new(
+            row_fields.clone(),
+            vec![
+                Arc::new(ids) as ArrayRef,
+                Arc::new(names) as ArrayRef,
+                Arc::new(categories) as ArrayRef,
+                Arc::new(prices) as ArrayRef,
+                Arc::new(quantities) as ArrayRef,
+                Arc::new(timestamps) as ArrayRef,
+            ],
+            Some(NullBuffer::from(vec![true, true])),
+        );
+        let op_col = StringArray::from(vec!["r", "r"]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(before_struct) as ArrayRef,
+                Arc::new(after_struct) as ArrayRef,
+                Arc::new(op_col) as ArrayRef,
+            ],
+        )
+        .expect("failed to create products batch");
+        let mut sink = Vec::new();
+        let mut writer = WriterBuilder::new(schema)
+            .with_fingerprint_strategy(FingerprintStrategy::Id(1))
+            .build::<_, AvroSoeFormat>(&mut sink)
+            .expect("failed to create writer");
+        // Encode row by row
+        for row_idx in 0..batch.num_rows() {
+            let single_row = batch.slice(row_idx, 1);
+            writer
+                .write(&single_row)
+                .unwrap_or_else(|e| panic!("Failed to encode product row 
{row_idx}: {e}"));
+        }
+        writer.finish().expect("failed to finish writer");
+        assert!(!sink.is_empty());
+    }
+
+    #[test]
+    fn non_nullable_child_in_nullable_struct_should_encode_per_row() {
+        use arrow_array::{
+            ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, 
StructArray,
+        };
+        use arrow_schema::{DataType, Field, Fields, Schema};
+        use std::sync::Arc;
+        let row_fields = Fields::from(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ]);
+        let row_struct_dt = DataType::Struct(row_fields.clone());
+        let before: ArrayRef = 
Arc::new(StructArray::new_null(row_fields.clone(), 1));
+        let id_col: ArrayRef = Arc::new(Int32Array::from(vec![1]));
+        let name_col: ArrayRef = 
Arc::new(StringArray::from(vec![None::<&str>]));
+        let after: ArrayRef = Arc::new(StructArray::new(
+            row_fields.clone(),
+            vec![id_col, name_col],
+            None,
+        ));
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("before", row_struct_dt.clone(), true),
+            Field::new("after", row_struct_dt, true),
+            Field::new("op", DataType::Utf8, false),
+            Field::new("ts_ms", DataType::Int64, false),
+        ]));
+        let op = Arc::new(StringArray::from(vec!["r"])) as ArrayRef;
+        let ts_ms = Arc::new(Int64Array::from(vec![1732900000000_i64])) as 
ArrayRef;
+        let batch = RecordBatch::try_new(schema.clone(), vec![before, after, 
op, ts_ms]).unwrap();
+        let mut buf = Vec::new();
+        let mut writer = WriterBuilder::new(schema.as_ref().clone())
+            .build::<_, AvroSoeFormat>(&mut buf)
+            .unwrap();
+        let single = batch.slice(0, 1);
+        let res = writer.write(&single);
+        assert!(
+            res.is_ok(),
+            "expected to encode successfully, got: {:?}",
+            res.err()
+        );
+    }
+
+    #[test]
+    fn test_union_nonzero_type_ids() -> Result<(), ArrowError> {
+        use arrow_array::UnionArray;
+        use arrow_buffer::Buffer;
+        use arrow_schema::UnionFields;
+        let union_fields = UnionFields::new(
+            vec![2, 5],
+            vec![
+                Field::new("v_str", DataType::Utf8, true),
+                Field::new("v_int", DataType::Int32, true),
+            ],
+        );
+        let strings = StringArray::from(vec!["hello", "world"]);
+        let ints = Int32Array::from(vec![10, 20, 30]);
+        let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
+        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
+        let union_array = UnionArray::try_new(
+            union_fields.clone(),
+            type_ids.into(),
+            Some(offsets.into()),
+            vec![Arc::new(strings) as ArrayRef, Arc::new(ints) as ArrayRef],
+        )?;
+        let schema = Schema::new(vec![Field::new(
+            "union_col",
+            DataType::Union(union_fields, UnionMode::Dense),
+            false,
+        )]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(union_array) as ArrayRef],
+        )?;
+        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
+        assert!(
+            writer.write(&batch).is_ok(),
+            "Expected no error from writing"
+        );
+        writer.finish()?;
+        assert!(
+            writer.finish().is_ok(),
+            "Expected no error from finishing writer"
+        );
+        Ok(())
+    }
+
     #[test]
     fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), ArrowError> {
         let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);


Reply via email to