kosiew commented on code in PR #21375:
URL: https://github.com/apache/datafusion/pull/21375#discussion_r3171748021
##########
datafusion/sql/src/unparser/plan.rs:
##########
@@ -605,7 +610,181 @@ impl Unparser<'_> {
if self.dialect.unnest_as_lateral_flatten() {
Self::collect_flatten_aliases(p.input.as_ref(), select);
}
- self.reconstruct_select_statement(plan, p, select)?;
+ let found_agg = self.reconstruct_select_statement(plan, p,
select)?;
+
+ // If the Projection claimed an Aggregate by reaching through
+ // one or more Limit / Sort nodes, fold their clauses into the
+ // current query and skip those nodes during recursion.
+ // Otherwise the Limit/Sort arm would see `already_projected`
+ // and wrap everything in a spurious derived subquery, emitting
+ // the aggregate twice.
+ //
+ // Repeated modifiers of the same kind are merged (rather than
+ // breaking out of the loop) so the chain still folds into a
+ // single SQL SELECT regardless of how the inputs were stacked:
+ // * stacked Limits combine via `combine_limit` semantics —
+ // the same merge `PushDownLimit` applies in the optimizer
+ // when both skip/fetch are integer literals;
+ // * stacked Sorts keep the outermost (top) one, since an
+ // inner Sort is reordered by the outer one.
+ // If we cannot combine literally (e.g. expression skip/fetch
+ // for the inner Limit), we stop and let recursion handle the
+ // remainder via a derived subquery.
+ if found_agg {
+ // Walk top-down through Limit / Sort nodes between the
+ // Projection and the Aggregate, folding each clause into
+ // the current SQL SELECT. We track combined_* for any
+ // literal Limits and apply once at the end so repeated
+ // Limits merge via `combine_limit` (the same rule used by
+ // the optimizer's `PushDownLimit`). A non-literal Limit
+ // is set on the query directly; if one is followed by
+ // another Limit we have no safe way to merge, so the
+ // loop terminates and recursion handles the remainder.
+ let mut cur = p.input.as_ref();
+ let mut combined_skip: usize = 0;
+ let mut combined_fetch: Option<usize> = None;
+ let mut have_combined_limit = false;
+ let mut have_direct_limit = false;
+ let mut have_order_by = false;
+ loop {
+ match cur {
+ LogicalPlan::Limit(limit) => {
Review Comment:
I think there is still a subtle semantic issue here.
Right now the loop flattens every Limit and Sort it encounters into the same
QueryBuilder, without considering their relative order. This can change
behavior for shapes like Projection -> Sort -> Limit -> Aggregate (and also
Projection -> Sort -> Limit -> Sort -> Aggregate).
In the logical plan, the lower Limit is applied before the outer Sort. But
in the emitted SQL, ORDER BY ... LIMIT ... applies the sort first and then the
limit.
For grouped aggregates, this can lead to different results. For example, the
inner aggregate might produce many groups, then Limit(10) keeps the first 10
rows, and finally an outer Sort orders just those 10. The current code instead
emits ORDER BY ASC LIMIT 10, which selects the globally smallest 10 rows.
There is a similar issue when a second Sort appears below an already folded
outer sort after a limit. The inner sort is dropped, but it actually determines
which rows the limit keeps.
It would be safer to only flatten chains that are order preserving and
equivalent, like Limit -> Sort -> Aggregate. Otherwise, we should stop and emit
a derived subquery when a limit boundary sits below an outer sort.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]