This is an automated email from the ASF dual-hosted git repository.

jeffreyvo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new b1dfb697ba Fix row slice bug in Union column decoding with many 
columns (#9000)
b1dfb697ba is described below

commit b1dfb697babcb614040ea2ae17e842e9db557f69
Author: Matthew Kim <[email protected]>
AuthorDate: Mon Jan 5 20:08:08 2026 -0500

    Fix row slice bug in Union column decoding with many columns (#9000)
    
    # Which issue does this PR close?
    
    - Closes https://github.com/apache/arrow-rs/issues/8999
    
    # Rationale for this change
    
    This PR fixes a bug in the row-to-column conversion for Union types when
    multiple union columns are present in the same row converter
    
    Previously, the row slice was being consumed from reading their data
    correctly. The fix tracks bytes consumed per row across all union
    fields, this way it properly advances row slices
---
 arrow-row/src/lib.rs | 157 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 154 insertions(+), 3 deletions(-)

diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index 3c63f3bd6b..307281bf9d 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -1901,12 +1901,9 @@ unsafe fn decode_column(
 
                 let child_row = &row[1..];
                 rows_by_field[field_idx].push((idx, child_row));
-
-                *row = &row[row.len()..];
             }
 
             let mut child_arrays: Vec<ArrayRef> = 
Vec::with_capacity(converters.len());
-
             let mut offsets = (*mode == UnionMode::Dense).then(|| 
Vec::with_capacity(len));
 
             for (field_idx, converter) in converters.iter().enumerate() {
@@ -1928,6 +1925,14 @@ unsafe fn decode_column(
                         let child_array =
                             unsafe { converter.convert_raw(&mut child_data, 
validate_utf8) }?;
 
+                        // advance row slices by the bytes consumed
+                        for ((row_idx, original_bytes), remaining_bytes) in
+                            field_rows.iter().zip(child_data)
+                        {
+                            let consumed_length = 1 + original_bytes.len() - 
remaining_bytes.len();
+                            rows[*row_idx] = 
&rows[*row_idx][consumed_length..];
+                        }
+
                         
child_arrays.push(child_array.into_iter().next().unwrap());
                     }
                     UnionMode::Sparse => {
@@ -1949,6 +1954,14 @@ unsafe fn decode_column(
 
                         let child_array =
                             unsafe { converter.convert_raw(&mut sparse_data, 
validate_utf8) }?;
+
+                        // advance row slices by the bytes consumed for rows 
that belong to this field
+                        for (row_idx, child_row) in field_rows.iter() {
+                            let remaining_len = sparse_data[*row_idx].len();
+                            let consumed_length = 1 + child_row.len() - 
remaining_len;
+                            rows[*row_idx] = 
&rows[*row_idx][consumed_length..];
+                        }
+
                         
child_arrays.push(child_array.into_iter().next().unwrap());
                     }
                 }
@@ -4049,6 +4062,144 @@ mod tests {
         assert!(rows.row(3) < rows.row(1));
     }
 
+    #[test]
+    fn test_row_converter_roundtrip_with_many_union_columns() {
+        // col 1: Union(Int32, Utf8) [67, "hello"]
+        let fields1 = UnionFields::try_new(
+            vec![0, 1],
+            vec![
+                Field::new("int", DataType::Int32, true),
+                Field::new("string", DataType::Utf8, true),
+            ],
+        )
+        .unwrap();
+
+        let int_array1 = Int32Array::from(vec![Some(67), None]);
+        let string_array1 = StringArray::from(vec![None::<&str>, 
Some("hello")]);
+        let type_ids1 = vec![0i8, 1].into();
+
+        let union_array1 = UnionArray::try_new(
+            fields1.clone(),
+            type_ids1,
+            None,
+            vec![
+                Arc::new(int_array1) as ArrayRef,
+                Arc::new(string_array1) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        // col 2: Union(Int32, Utf8) [100, "world"]
+        let fields2 = UnionFields::try_new(
+            vec![0, 1],
+            vec![
+                Field::new("int", DataType::Int32, true),
+                Field::new("string", DataType::Utf8, true),
+            ],
+        )
+        .unwrap();
+
+        let int_array2 = Int32Array::from(vec![Some(100), None]);
+        let string_array2 = StringArray::from(vec![None::<&str>, 
Some("world")]);
+        let type_ids2 = vec![0i8, 1].into();
+
+        let union_array2 = UnionArray::try_new(
+            fields2.clone(),
+            type_ids2,
+            None,
+            vec![
+                Arc::new(int_array2) as ArrayRef,
+                Arc::new(string_array2) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        // create a row converter with 2 union columns
+        let field1 = Field::new("col1", DataType::Union(fields1, 
UnionMode::Sparse), true);
+        let field2 = Field::new("col2", DataType::Union(fields2, 
UnionMode::Sparse), true);
+
+        let sort_field1 = SortField::new(field1.data_type().clone());
+        let sort_field2 = SortField::new(field2.data_type().clone());
+
+        let converter = RowConverter::new(vec![sort_field1, 
sort_field2]).unwrap();
+
+        let rows = converter
+            .convert_columns(&[
+                Arc::new(union_array1.clone()) as ArrayRef,
+                Arc::new(union_array2.clone()) as ArrayRef,
+            ])
+            .unwrap();
+
+        // roundtrip
+        let out = converter.convert_rows(&rows).unwrap();
+
+        let [col1, col2] = out.as_slice() else {
+            panic!("expected 2 columns")
+        };
+
+        let col1 = col1.as_any().downcast_ref::<UnionArray>().unwrap();
+        let col2 = col2.as_any().downcast_ref::<UnionArray>().unwrap();
+
+        for (expected, got) in [union_array1, union_array2].iter().zip([col1, 
col2]) {
+            assert_eq!(expected.len(), got.len());
+            assert_eq!(expected.type_ids(), got.type_ids());
+
+            for i in 0..expected.len() {
+                assert_eq!(expected.value(i).as_ref(), got.value(i).as_ref());
+            }
+        }
+    }
+
+    #[test]
+    fn test_row_converter_roundtrip_with_one_union_column() {
+        let fields = UnionFields::try_new(
+            vec![0, 1],
+            vec![
+                Field::new("int", DataType::Int32, true),
+                Field::new("string", DataType::Utf8, true),
+            ],
+        )
+        .unwrap();
+
+        let int_array = Int32Array::from(vec![Some(67), None]);
+        let string_array = StringArray::from(vec![None::<&str>, 
Some("hello")]);
+        let type_ids = vec![0i8, 1].into();
+
+        let union_array = UnionArray::try_new(
+            fields.clone(),
+            type_ids,
+            None,
+            vec![
+                Arc::new(int_array) as ArrayRef,
+                Arc::new(string_array) as ArrayRef,
+            ],
+        )
+        .unwrap();
+
+        let field = Field::new("col", DataType::Union(fields, 
UnionMode::Sparse), true);
+        let sort_field = SortField::new(field.data_type().clone());
+        let converter = RowConverter::new(vec![sort_field]).unwrap();
+
+        let rows = converter
+            .convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
+            .unwrap();
+
+        // roundtrip
+        let out = converter.convert_rows(&rows).unwrap();
+
+        let [col1] = out.as_slice() else {
+            panic!("expected 1 column")
+        };
+
+        let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
+        assert_eq!(col.len(), union_array.len());
+        assert_eq!(col.type_ids(), union_array.type_ids());
+
+        for i in 0..col.len() {
+            assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
+        }
+    }
+
     #[test]
     fn rows_size_should_count_for_capacity() {
         let row_converter = 
RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();

Reply via email to