cj-zhukov commented on code in PR #21021:
URL: https://github.com/apache/datafusion/pull/21021#discussion_r3008529964


##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -410,21 +412,102 @@ impl DataFrame {
         expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
     ) -> Result<DataFrame> {
         let expr_list: Vec<SelectExpr> =
-            expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
+            expr_list.into_iter().map(|e| e.into()).collect();
 
+        // Extract expressions
         let expressions = expr_list.iter().filter_map(|e| match e {
             SelectExpr::Expression(expr) => Some(expr),
             _ => None,
         });
 
-        let window_func_exprs = find_window_exprs(expressions);
-        let plan = if window_func_exprs.is_empty() {
+        // Apply window functions first
+        let window_func_exprs = find_window_exprs(expressions.clone());
+
+        let mut plan = if window_func_exprs.is_empty() {
             self.plan
         } else {
             LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
         };
 
-        let project_plan = 
LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+        // Collect aggregate expressions
+        let aggr_exprs = find_aggregate_exprs(expressions.clone());
+
+        // Check for non-aggregate expressions
+        let has_non_aggregate_expr = expressions
+            .clone()
+            .any(|expr| 
find_aggregate_exprs(std::iter::once(expr)).is_empty());
+
+        // Fallback to projection:
+        // - already aggregated
+        // - contains non-aggregate expressions
+        // - no aggregates
+        if matches!(plan, LogicalPlan::Aggregate(_))
+            || has_non_aggregate_expr
+            || aggr_exprs.is_empty()
+        {
+            let project_plan =
+                LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+
+            return Ok(DataFrame {
+                session_state: self.session_state,
+                plan: project_plan,
+                projection_requires_validation: false,
+            });
+        }
+
+        // Assign aliases to aggregate expressions
+        let mut aggr_map: HashMap<Expr, Expr> = HashMap::new();
+        let mut used_names = HashSet::new();
+        let aggr_exprs_with_alias: Vec<Expr> = aggr_exprs
+            .into_iter()
+            .map(|expr| {
+                let base_name = expr.name_for_alias()?;
+                let mut name = base_name.clone();
+                let mut counter = 1;
+                while used_names.contains(&name) {
+                    name = format!("{base_name}_{counter}");
+                    counter += 1;
+                }
+                used_names.insert(name.clone());
+                let aliased = expr.clone().alias(name.clone());
+                let col = Expr::Column(Column::from_name(name));
+                aggr_map.insert(expr, col);
+                Ok(aliased)
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Build aggregate plan
+        plan = LogicalPlanBuilder::from(plan)
+            .aggregate(Vec::<Expr>::new(), aggr_exprs_with_alias)?
+            .build()?;
+
+        // Rewrite expressions to use aggregate outputs
+        let rewrite_expr = |expr: Expr, aggr_map: &HashMap<Expr, Expr>| -> 
Result<Expr> {
+            expr.transform(|e| {
+                Ok(match aggr_map.get(&e) {
+                    Some(replacement) => Transformed::yes(replacement.clone()),
+                    None => Transformed::no(e),
+                })
+            })
+            .map(|t| t.data)
+        };
+
+        let mut rewritten_exprs = Vec::with_capacity(expr_list.len());
+        for select_expr in expr_list.into_iter() {
+            match select_expr {
+                SelectExpr::Expression(expr) => {
+                    let rewritten = rewrite_expr(expr.clone(), &aggr_map)?;
+                    let alias = expr.name_for_alias()?;
+                    
rewritten_exprs.push(SelectExpr::Expression(rewritten.alias(alias)));

Review Comment:
   I agree with you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to