adriangb opened a new issue, #21981:
URL: https://github.com/apache/datafusion/issues/21981

   ### Describe the bug
   
   `MinAccumulator` (and `MaxAccumulator`) over a `List<Utf8>` column whose 
**inner item Field** carries metadata produces `ScalarValue::List` results 
whose inner-Field metadata is **path-dependent**: preserved for all-null row 
groups but stripped for row groups that contain at least one non-null value. 
Concatenating those per-row-group results via `ScalarValue::iter_to_array` then 
fails with a type mismatch, and a SQL `MIN`/`MAX` over such a column with 
multiple partitions surfaces the same divergence as `Internal error: cannot 
compare values of different or incompatible types`.
   
   The same shape holds for `LargeList` and `FixedSizeList`. `Struct` / `Map` / 
`Union` / `RunEndEncoded` are unaffected because their `try_from_array` paths 
preserve the source array's full DataType.
   
   ### Root cause
   
   The non-null path goes through 
[`ScalarValue::try_from_array`](https://github.com/apache/datafusion/blob/main/datafusion/common/src/scalar/mod.rs#L3967-L3974):
   
   ```rust
   DataType::List(field) => {
       let list_array = array.as_list::<i32>();
       let nested_array = list_array.value(index);
       SingleRowListArrayBuilder::new(nested_array)
           .with_field(field)
           .build_list_scalar()
   }
   ```
   
   But 
[`SingleRowListArrayBuilder::with_field`](https://github.com/apache/datafusion/blob/main/datafusion/common/src/utils/mod.rs#L449-L452)
 only copies `name` and `nullable`:
   
   ```rust
   pub fn with_field(self, field: &Field) -> Self {
       self.with_field_name(Some(field.name().to_owned()))
           .with_nullable(field.is_nullable())
   }
   ```
   
   Then 
[`into_field_and_arr`](https://github.com/apache/datafusion/blob/main/datafusion/common/src/utils/mod.rs#L518-L530)
 builds a fresh `Field::new(name, data_type, nullable)` with empty metadata.
   
   The all-null path in 
[`min_max_batch_generic`](https://github.com/apache/datafusion/blob/main/datafusion/functions-aggregate-common/src/min_max.rs#L779-L781)
 goes through `ScalarValue::try_from(array.data_type())` → 
[`try_new_null`](https://github.com/apache/datafusion/blob/main/datafusion/common/src/scalar/mod.rs#L1310-L1312):
   
   ```rust
   DataType::List(field_ref) => ScalarValue::List(Arc::new(
       GenericListArray::new_null(Arc::clone(field_ref), 1),
   )),
   ```
   
   `Arc::clone(field_ref)` preserves the full Field, so metadata survives. The 
two paths return ScalarValues with different inner-Field metadata for the same 
logical column.
   
   ### To Reproduce
   
   `Cargo.toml`:
   
   ```toml
   [dependencies]
   datafusion = "53.1"
   tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
   ```
   
   `src/main.rs`:
   
   ```rust
   use std::collections::HashMap;
   use std::sync::Arc;
   
   use datafusion::arrow::array::{Array, ArrayRef, ListArray, RecordBatch, 
StringArray};
   use datafusion::arrow::buffer::OffsetBuffer;
   use datafusion::arrow::datatypes::{DataType, Field, Schema};
   use datafusion::datasource::MemTable;
   use datafusion::functions_aggregate::min_max::MinAccumulator;
   use datafusion::logical_expr::Accumulator;
   use datafusion::prelude::SessionContext;
   use datafusion::scalar::ScalarValue;
   
   fn inner_with_metadata() -> Arc<Field> {
       let mut md = HashMap::new();
       md.insert("ARROW:extension:name".to_string(), "arrow.json".to_string());
       md.insert("ARROW:extension:metadata".to_string(), "{}".to_string());
       Arc::new(Field::new("item", DataType::Utf8, true).with_metadata(md))
   }
   
   fn dump(label: &str, sv: &ScalarValue) {
       if let ScalarValue::List(arr) = sv {
           if let DataType::List(inner) = arr.data_type() {
               println!("  {label}: inner_field metadata={:?}", 
inner.metadata());
               return;
           }
       }
       println!("  {label}: {sv:?}");
   }
   
   fn unit_repro() {
       println!("== Direct MinAccumulator reproducer ==");
       let inner = inner_with_metadata();
       let list_dt = DataType::List(Arc::clone(&inner));
   
       // Row group 1: all NULL → metadata preserved
       let mut acc1 = MinAccumulator::try_new(&list_dt).unwrap();
       let null_arr: ArrayRef = 
Arc::new(ListArray::new_null(Arc::clone(&inner), 2));
       acc1.update_batch(&[null_arr]).unwrap();
       let sv1 = acc1.evaluate().unwrap();
       dump("row group 1 (all-null)", &sv1);
   
       // Row group 2: has values → metadata stripped
       let mut acc2 = MinAccumulator::try_new(&list_dt).unwrap();
       let values: ArrayRef = Arc::new(StringArray::from(vec!["alpha", 
"beta"]));
       let offsets = OffsetBuffer::new(vec![0_i32, 1, 2].into());
       let arr_vals: ArrayRef = Arc::new(ListArray::new(Arc::clone(&inner), 
offsets, values, None));
       acc2.update_batch(&[arr_vals]).unwrap();
       let sv2 = acc2.evaluate().unwrap();
       dump("row group 2 (values)", &sv2);
   
       println!(
           "  iter_to_array: {:?}",
           ScalarValue::iter_to_array(vec![sv1, sv2]).map(|a| 
a.data_type().clone())
       );
   }
   
   async fn sql_repro() -> datafusion::error::Result<()> {
       println!();
       println!("== SQL MIN() reproducer ==");
       let inner = inner_with_metadata();
       let list_dt = DataType::List(Arc::clone(&inner));
       let schema = Arc::new(Schema::new(vec![Field::new("col", list_dt, 
true)]));
   
       // Two partitions: one all-null row, one non-null row.
       let null_arr: ArrayRef = 
Arc::new(ListArray::new_null(Arc::clone(&inner), 1));
       let batch_a = RecordBatch::try_new(schema.clone(), vec![null_arr])?;
   
       let values: ArrayRef = Arc::new(StringArray::from(vec!["alpha"]));
       let offsets = OffsetBuffer::new(vec![0_i32, 1].into());
       let list_arr: ArrayRef = Arc::new(ListArray::new(Arc::clone(&inner), 
offsets, values, None));
       let batch_b = RecordBatch::try_new(schema.clone(), vec![list_arr])?;
   
       let table = MemTable::try_new(schema, vec![vec![batch_a], 
vec![batch_b]])?;
       let ctx = SessionContext::new();
       ctx.register_table("t", Arc::new(table))?;
   
       let df = ctx.sql("SELECT MIN(col) AS m FROM t").await?;
       match df.collect().await {
           Ok(batches) => println!("  result: {batches:?}"),
           Err(e) => println!("  SQL FAILED: {e}"),
       }
       Ok(())
   }
   
   #[tokio::main]
   async fn main() -> datafusion::error::Result<()> {
       unit_repro();
       sql_repro().await
   }
   ```
   
   ### Expected behavior
   
   `MinAccumulator` over a `List<Utf8>` column should produce 
`ScalarValue::List` values with consistent inner-Field metadata across row 
groups, regardless of whether a particular row group is all-null. 
`ScalarValue::iter_to_array` over those values should succeed, and `SELECT 
MIN(col)` over a multi-partition table should not error with "cannot compare 
values of different or incompatible types".
   
   ### Actual output
   
   ```
   == Direct MinAccumulator reproducer ==
     row group 1 (all-null): inner_field metadata={"ARROW:extension:name": 
"arrow.json", "ARROW:extension:metadata": "{}"}
     row group 2 (values): inner_field metadata={}
     iter_to_array: Err(ArrowError(InvalidArgumentError("It is not possible to 
concatenate arrays of different data types (List(Utf8, metadata: 
{\"ARROW:extension:metadata\": \"{}\", \"ARROW:extension:name\": 
\"arrow.json\"}), List(Utf8)).")))
   
   == SQL MIN() reproducer ==
     SQL FAILED: Internal error: cannot compare values of different or 
incompatible types: List() vs List([alpha]).
   ```
   
   ### Suggested fix
   
   Extend `SingleRowListArrayBuilder` to carry a full `Arc<Field>` (not just 
`field_name` + `nullable`) and use it directly in `into_field_and_arr` rather 
than constructing a fresh `Field`. The existing `with_field` callers in 
`try_from_array` for `List` / `LargeList` / `FixedSizeList` / `ListView` / 
`LargeListView` (and the `try_new_null` arms for `FixedSizeList` / `ListView` / 
`LargeListView` that use `with_field`) would then preserve the inner-Field 
metadata.
   
   `Struct`, `Map`, `Union`, and `RunEndEncoded` arms of `try_from_array` 
already preserve metadata (via `array.slice` or explicit `Arc::clone`), so the 
fix is scoped to the `SingleRowListArrayBuilder` path.
   
   ### Additional context
   
   - Surfaced in production while writing zone-map-style stats files: 
per-row-group `MinAccumulator::evaluate()` results are concatenated via 
`ScalarValue::iter_to_array` to form a stats batch. Mixed metadata across row 
groups makes the concat fail at write time. This is what got us to track down 
the underlying mechanism.
   - Bisected: bug exists in 53.1.0 (latest release as of filing) and on `main` 
at `with_field` line 449-452.
   - Datafusion version tested: `datafusion = "53.1"` from crates.io.
   - Rust version: stable.
   
   ### API breaking changes
   
   No (the proposed fix adds a field to `SingleRowListArrayBuilder` and changes 
`with_field`'s body; signature is unchanged).
   
   ### Component(s)
   
   `common`, `functions-aggregate`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to