mbutrovich commented on PR #21962:
URL: https://github.com/apache/datafusion/pull/21962#issuecomment-4353914929

   Thanks @comphead! The `MergeAsPartialUDF` wrapper is a clever way to get 
`PartialMerge` semantics without reaching into DataFusion's aggregation logic.
   
   ### Streaming shim asymmetry
   
   I checked the Spark source at `v3.5.8`, `v4.0.2`, and `v4.1.1`. 
`StreamSourceAwareSparkPlan` doesn't exist before 4.x, so needing a different 
3.x approach is unavoidable. The shapes of the two checks do differ in a way 
worth flagging though.
   
   On 3.x the shim does `plan.logicalLink.exists(_.isStreaming)`. That only 
looks at the logical link of the root physical node. `LogicalPlan.isStreaming` 
does recurse through logical children, but if the root physical node has 
`logicalLink = None` (a rule built a fresh node without copying the link) the 
check silently returns `false`.
   
   On 4.x the shim does `plan.exists { case p: StreamSourceAwareSparkPlan => 
p.getStream.isDefined }`. `SparkPlan.exists` walks the whole physical tree, and 
the leaf check does not depend on logical link preservation.
   
   Would `plan.exists(_.logicalLink.exists(_.isStreaming))` on 3.x be a safer 
analogue, or is there a reason you only want to inspect the root? Totally fine 
if this is out of scope for the PartialMerge work, just wanted to surface it.
   
   ### `sum_int.rs` merge_batch rewrite
   
   The prior `merge_batch` only read row 0 of the state array, which looks like 
a latent bug whenever a merge batch had more than one row. Switching to 
`update_batch(states)` fixes that. Do you think it's worth adding a small 
direct test that exercises `merge_batch` with a multi-row state array 
independent of the `PartialMerge` plumbing, so the fix is captured even if the 
`MergeAsPartial` path changes later?
   
   ### `collect_set` in the description
   
   The description lists `FIRST` / `LAST` / `collect_set` as falling back, but 
the serde only filters `First` and `Last`. Is `collect_set` covered entirely by 
the streaming plan fallback? If so, a line of comment near the `First`/`Last` 
filter explaining the split would help future readers.
   
   ### `distinct` test lost its aggregate count assertion
   
   The previous version used `checkSparkAnswerAndNumOfAggregates` to assert how 
many native aggregate stages ran. The new version drops that. Since this PR is 
specifically about enabling more stages to stay native, would it be worth 
keeping a mode count assertion in one or two of the cases so a future 
regression that silently falls back one stage still shows up? Same thread as 
the existing review comment.
   
   ### Test name mismatch
   
   `partialMerge - distinct + non-distinct with first() FILTER (Expand 
pattern)` doesn't use `FIRST` or `FILTER` in the queries it runs. Was that the 
intended coverage, or did the queries drift from the original intent?
   
   ### Leftover configs
   
   In `partialMerge - cnt distinct + sum`, are 
`spark.comet.cast.allowIncompatible` and 
`spark.comet.expression.Cast.allowIncompatible` actually needed? Looks like 
they might be leftover from local debugging.
   
   ### AVG(DISTINCT) coverage
   
   The serde relaxation also admits `SUM(DISTINCT x)` and `AVG(DISTINCT x)` for 
single column. The `distinct` test covers `COUNT` and `SUM` variants but I 
didn't see `AVG(DISTINCT)` exercised. Could we add one case, ideally mixed with 
a non distinct aggregate like `AVG(DISTINCT a) + SUM(b)`, so the Expand plan is 
exercised for a non count distinct?
   
   ### `planner.rs` mode literals
   
   The match on `agg.mode` uses `0`, `1`, `2` directly. Would it be cleaner to 
match on the generated `AggregateMode` enum so the proto and native sides can't 
desync silently?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to