andygrove opened a new issue, #4724: URL: https://github.com/apache/datafusion-comet/issues/4724
## Describe the bug `collect_list` / `collect_set` declare their aggregate buffer as `BinaryType` in Spark (serialized `TypedImperativeAggregate` state) but produce a native `ArrayType` (Arrow `List`) state in Comet. Comet bridges this only for the simple two-stage shape: `CometObjectHashAggregateExec.adjustOutputForNativeState` rewrites the buffer column of a **pure-`Partial`** aggregate to `ArrayType`, so `Partial -> Final` runs natively and correctly. It does **not** handle multi-stage aggregates that contain a `PartialMerge` stage, which Spark introduces for the distinct-aggregate rewrite, e.g.: ```sql SELECT x, count(DISTINCT y), collect_list(z) FROM t GROUP BY x ``` This plans as `Partial(x,y) -> PartialMerge(x,y) -> [PartialMerge, Partial](x) -> Final(x)`. The intermediate `PartialMerge` outputs are still declared `BinaryType`, so a fully-native pipeline crashes at runtime: ``` CometNativeException: Cast error: Cannot cast LIST to non-list data type Binary ``` (and a related nullability drift `List(non-null T)` vs `List(nullable T)` for nested element types). This is a facet of the broader Arrow-type-drift tracked in #4515. This affects both `collect_list` (added in #4720) and the already-shipped `collect_set` — `collect_set` has the same latent crash on this shape. ## Current behavior (workaround in #4720) To avoid the crash, multi-stage `collect_list`/`collect_set` aggregates now fall back to Spark consistently: - `CometExecRule.tagUnsafePartialAggregates` tags the feeding pure-`Partial` when a `PartialMerge` stage of a `CollectList`/`CollectSet` aggregate is present. - `CometBaseAggregate.doConvert` falls back a `PartialMerge`/`Final` stage of these functions when no Comet partial produced the buffer (the cross-engine `LocalTableScan` case). The simple two-stage `collect_list`/`collect_set` cases continue to run natively. ## Expected behavior Enable fully-native multi-stage execution by correcting the intermediate buffer schema for `PartialMerge` (and multi-mode) stages of `CollectList`/`CollectSet` (extend `adjustOutputForNativeState` beyond pure-`Partial`, and fix the element-nullability drift), so the `ArrayType` buffer round-trips across all stages. Then remove the fallback guards added in #4720. ## Additional context - Related: #4515 (DataFusion / datafusion-spark functions whose Arrow return type drifts from Spark catalyst's declared type) - Introduced alongside: #4720 (native `collect_list` / `array_agg`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
