gstvg commented on PR #21679:
URL: https://github.com/apache/datafusion/pull/21679#issuecomment-4286551740

   > ... requiring the user to provide a valid typed value, same as other 
engines.
   
   @LiaCastaneda  I thought they didn't require the exact type, but after 
checking spark and clickhouse, they do require it, thanks for the info. So 
answering the initial question of @rluvaton and @pepijnve, the current design 
does allow to implement the spark reduce by using initial_value type as the 
merge accumulator type and the finish only argument type, as spark itself does: 
   
    [checking initial_value/zero and merge output to 
match](https://github.com/apache/spark/blob/96581a5b9611d28518eb22c66add581b9724a899/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala#L840-L841)
   [using initial_value/zero as accumulator 
type](https://github.com/apache/spark/blob/96581a5b9611d28518eb22c66add581b9724a899/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala#L861-L863)
   [test with explicit casts (the only test with arrays that I 
found)](https://github.com/apache/spark/blob/fd472f1ca39113234cbf3f647ebb0ecb6d0bee74/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala#L198)
   
   But it would be nicer to not require explicit casts, 
[snowflake](https://docs.snowflake.com/en/sql-reference/functions/reduce#use-an-array-for-the-accumulator)
 and the [next release of 
duckdb](https://www.github.com/duckdb/duckdb/pull/21032/changes#diff-e037d097f599d5e8b7499ad31c4325156fef4f5327760310907140f6658ca1caR78-R82)
 will support that, so I agree that we should at least plan to support it 
without requiring breaking changes in the future.
   
   My WIP branch is currently using this:
   
   <details>
   
   ```rust
   enum LambdaParametersProgress {
       // the parameters of some lambdas are unkrnow 
       Partial(Vec<Option<Vec<FieldRef>>>),
       // the parameters of all lambdas are know
       Complete(Vec<Vec<FieldRef>>),
   }
   
   trait HigherOrderUDF {
       fn lambda_parameters(
           &self,
           step: usize,
           // Some = the return field of a lambda that could be computed 
because it's parameters have been returned in some previous call 
          // None = the previous call didn't returned the parameters of the 
lambda
           fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
       ) -> Result<LambdaParametersProgress>;
   }
   
   impl HigherOrderUDF for ArrayReduce {
       fn lambda_parameters(
           &self,
           step: usize,
           fields: &[ValueOrLambda<FieldRef, Option<FieldRef>>],
       ) -> Result<LambdaParametersProgress> {
           // optional finish not handled for simplicity
           let [
               ValueOrLambda::Value(list),
               ValueOrLambda::Value(initial_value),
               ValueOrLambda::Lambda(merge),
               ValueOrLambda::Lambda(_finish),
           ] = fields
           else {
               return plan_err!(
                   "reduce expects a list value, then an initial value, then a 
merge lambda and finally a finish lambda"
               );
           };
   
           let list_field = match list.data_type() {
               // currently only fsl supported
               DataType::FixedSizeList(field, _) => field,
               _ => return plan_err!("reduce expects a list as it's first 
argument"),
           };
   
           Ok(match (step, merge) {
               (0, _) => {
                   // at the first step, we use the initial_value as merge 
accumulator,
                   // and return None for finish since we don't know the output 
of merge
                   LambdaParametersProgress::Partial(vec![
                       // merge
                       Some(vec![Arc::clone(initial_value), 
Arc::clone(list_field)]),
                       // finish
                       None,
                   ])
               }
               (1, Some(accumulator)) => {
                   // now we can use the merge output as it's accumulator and
                   // as the finish parameter
                   LambdaParametersProgress::Complete(vec![
                       // merge
                       vec![Arc::clone(accumulator), Arc::clone(list_field)],
                       // finish
                       vec![Arc::clone(accumulator)],
                   ])
               }
               (1, None) => {
                   return plan_err!("merge should be resolved at reduce step 1 
(0-based)");
               }
               _ => todo!(),
           })
       }
   }
   ```
   
   </details>
   
   I hope to push this tomorrow to another branch with the CI running with a 
minimal implementation of array_reduce


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to