gstvg commented on code in PR #21679:
URL: https://github.com/apache/datafusion/pull/21679#discussion_r3152006719


##########
datafusion/sql/src/expr/function.rs:
##########
@@ -413,84 +412,116 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
                 })
                 .collect::<Result<Vec<_>>>()?;
 
-            let coerced_values =
+            let mut fields =
                 value_fields_with_higher_order_udf(&current_fields, 
fm.as_ref())?
                     .into_iter()
-                    .filter_map(|arg| match arg {
-                        ValueOrLambda::Value(value) => Some(value),
-                        ValueOrLambda::Lambda(_lambda) => None,
+                    .map(|arg| match arg {
+                        ValueOrLambda::Value(value) => 
ValueOrLambda::Value(value),
+                        ValueOrLambda::Lambda(_lambda) => 
ValueOrLambda::Lambda(None),
                     })
                     .collect::<Vec<_>>();
 
-            // lambda_parameters refers only to lambdas and not to values, so 
instead
-            // of zipping it with partially_planned, we iterate over 
partially_planned and only
-            // consume from lambda_parameters when a given argument is a lambda
-            // to reconstruct the arguments list with the correct order
-            // this supports any value and lambda positioning including
-            // multiple lambdas interleaved with values
-            let mut lambda_parameters =
-                fm.lambda_parameters(&coerced_values)?.into_iter();
-
-            let num_lambdas = partially_planned.len() - coerced_values.len();
-
-            // functions can support multiple lambdas where some trailing ones 
are optional,
-            // but to simplify the implementor, lambda_parameters returns the 
parameters of all of them,
-            // so we can't do equality check. one example is spark reduce:
-            // https://spark.apache.org/docs/latest/api/sql/index.html#reduce
-            if lambda_parameters.len() < num_lambdas {
-                return plan_err!(
-                    "{} invocation defined {num_lambdas} but lambda_parameters 
returned only {}",
-                    fm.name(),
-                    lambda_parameters.len()
-                );
-            }
+            let lambda_count = partially_planned
+                .iter()
+                .filter(|a| matches!(a, ExprOrLambda::Lambda(_)))
+                .count();
 
-            let args = partially_planned
-                .into_iter()
-                .map(|arg| match arg {
-                    ExprOrLambda::Expr(expr) => Ok(expr),
-                    ExprOrLambda::Lambda(lambda) => {
-                        let lambda_params =
-                            lambda_parameters.next().ok_or_else(|| {
-                                internal_datafusion_err!(
-                                    "lambda_parameters len should have been 
checked above"
-                                )
-                            })?;
-
-                        if lambda.params.len() > lambda_params.len() {
+            let mut step = 0;
+
+            let args = loop {
+                match fm.lambda_parameters(step, &fields)? {
+                    LambdaParametersProgress::Partial(params) => {
+                        let mut params = params.into_iter();
+
+                        if params.len() != lambda_count {
                             return plan_err!(
-                                "lambda defined {} params but UDF support only 
{}",
-                                lambda.params.len(),
-                                lambda_params.len()
+                                "{} lambda_parameters returned {} lambdas but 
{lambda_count} expected",
+                                fm.name(),
+                                params.len()
                             );
                         }
 
-                        let params = lambda
-                            .params
-                            .iter()
-                            .map(|p| crate::utils::normalize_ident(p.clone()))
-                            .collect();
+                        for (arg, field) in
+                            std::iter::zip(&partially_planned, &mut fields)
+                        {
+                            match (arg, field) {
+                                (
+                                    ExprOrLambda::Lambda(lambda),
+                                    ValueOrLambda::Lambda(field),
+                                ) => {
+                                    let params = params.next().ok_or_else(|| {
+                                        internal_datafusion_err!(
+                                            "params len should have been 
checked above"
+                                        )
+                                    })?;
+                                    if let Some(params) = params {
+                                        let lambda = 
self.sql_lambda_to_logical_lambda(
+                                            planner_context,
+                                            schema,
+                                            lambda.clone(),
+                                            params,
+                                        )?;
+
+                                        *field = 
Some(lambda.body.to_field(schema)?.1);
+                                    }
+                                }
+                                (ExprOrLambda::Expr(_), 
ValueOrLambda::Value(_)) => {} // nothing to do
+                                (ExprOrLambda::Expr(_), 
ValueOrLambda::Lambda(_)) => {
+                                    return internal_err!(
+                                        "value_fields_with_higher_order_udf 
returned a value for a lambda argument"
+                                    );
+                                }
+                                (ExprOrLambda::Lambda(_), 
ValueOrLambda::Value(_)) => {
+                                    return internal_err!(
+                                        "value_fields_with_higher_order_udf 
returned a lambda for a value argument"
+                                    );
+                                }
+                            }
+                        }
+                    }
+                    LambdaParametersProgress::Complete(params) => {
+                        let mut params = params.into_iter();
 
-                        let lambda_parameters = lambda_params
+                        if params.len() != lambda_count {
+                            return plan_err!(
+                                "{} lambda_parameters returned {} lambdas but 
{lambda_count} expected",
+                                fm.name(),
+                                params.len()
+                            );
+                        }
+
+                        break partially_planned
                             .into_iter()
-                            .zip(&params)
-                            .map(|(f, n)| Arc::new(f.with_name(n)));
-
-                        let mut planner_context = planner_context
-                            .clone()
-                            .with_lambda_parameters(lambda_parameters);
-
-                        Ok(Expr::Lambda(Lambda {
-                            params,
-                            body: Box::new(self.sql_expr_to_logical_expr(
-                                *lambda.body,
-                                schema,
-                                &mut planner_context,
-                            )?),
-                        }))
+                            .map(|arg| match arg {
+                                ExprOrLambda::Expr(expr) => Ok(expr),
+                                ExprOrLambda::Lambda(lambda) => {
+                                    let params = params.next().ok_or_else(|| {
+                                        internal_datafusion_err!(
+                                            "params len should have been 
checked above"
+                                        )
+                                    })?;
+
+                                    
Ok(Expr::Lambda(self.sql_lambda_to_logical_lambda(
+                                        planner_context,
+                                        schema,
+                                        lambda,
+                                        params,
+                                    )?))
+                                }
+                            })
+                            .collect::<Result<Vec<_>>>()?;
                     }
-                })
-                .collect::<Result<Vec<_>>>()?;
+                }
+
+                step += 1;
+
+                if step > 256 {
+                    return plan_err!(
+                        "{} lambda_parameters called 256 times without 
completion",
+                        fm.name()
+                    );
+                }
+            };

Review Comment:
   It misses a comment, indeed. It's mostly a guard against a faulty 
lambda_parameters that causes an infinite loop by endlessly returnin Partial's, 
but I agree that making it configurable it's better, but I stick to 256 as 
default so that ideally nobody has ever to think about it 
https://github.com/apache/datafusion/pull/21679/changes/9f19fedaa1d085a9254632df7b1069d4ff4202ce



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to