comphead commented on PR #21679:
URL: https://github.com/apache/datafusion/pull/21679#issuecomment-4328923119

   I tried to build `array_exists` spark function using current approach 
https://github.com/apache/datafusion/pull/21881
   
   Good thing it works. From the performance point of view there are some 
improvements can be made for lambdas but likely we can address it in follow, 
@gstvg it is up to you
   
   
   
   ```
    Loops — hoisting & fusion                                                   
                                                                                
                                             
                                                                                
                                                                                
                                              
     L1. Three passes over self.args in one evaluate call. 
datafusion/physical-expr/src/higher_order_function.rs:247, :269, :309 each 
iterate self.args, and each of the first and third pass calls           
     self.lambda_positions.contains(&i) (linear scan) inside the loop body. For 
N args with L lambdas, that's O(N·L) per batch just to classify args — the 
classification is already known at construction    
     time.                                                     
                                                                                
                                                                                
                                              
     - Fix: Replace lambda_positions: Vec<usize> with a per-arg enum cached on 
the struct:                                                                     
                                               
     enum ArgSlot { Value(Arc<dyn PhysicalExpr>), Lambda(Arc<LambdaExpr>) }
     slots: Vec<ArgSlot>,                                                       
                                                                                
                                              
     - The three passes then become one fused loop; O(N) per batch, zero 
allocation for the lookup, and the slow wrapped_lambda tree walk (:459-474) 
collapses to a cached Arc<LambdaExpr>.                   
                                                                                
                                                                                
                                              
     L2. Pass 2 is pure reshaping of pass 1 (arg_fields → fields, cloning 
Arcs). Can be folded into pass 1 by building the Vec<ValueOrLambda<FieldRef, 
Option<FieldRef>>> directly.                           
                                                                                
                                                                                
                                              
     ---                                                                        
                                                                                
                                              
     Data structures — precompute / augment                    
                                                                                
                                                                                
                                              
     D1. LambdaArgument::evaluate rebuilds the inner Schema on every call. 
datafusion/expr/src/higher_order_function.rs:256:
     let schema = Arc::new(Schema::new(self.params.clone()));                   
                                                                                
                                              
     Schema::new builds a name→index HashMap internally. For a nested HOF 
(array_transform([[1,2]], a -> array_transform(a, b -> ...))) this runs once 
per outer sublist per batch. Build it once in
     LambdaArgument::new and store Arc<Schema>.                                 
                                                                                
                                              
                                                                                
                                                                                
                                              
     D2. self.fun.clear_null_values() is called once per non-lambda arg per 
batch (:349). It's a &self method returning a constant bool. Cache as a bool on 
HigherOrderFunctionExpr at construction.          
                                                                                
                                                                                
                                              
     D3. wrapped_lambda walks the expr tree every evaluate. :253 and :315 both 
call it. Pre-resolve the Arc<LambdaExpr> once at try_new_with_schema / 
with_new_children and store it — lambdas don't change   
     mid-execution.                                                             
                                                                                
                                              
                                                                                
                                                                                
                                              
     D4. HigherOrderFunctionArgs::args and ::arg_fields are always Vec even for 
2-arg HOFs (array_transform, array_exists, array_filter — the vast majority). 
Switching to smallvec::SmallVec<[_; 2]>         
     eliminates a heap alloc per batch for the common shape.
                                                                                
                                                                                
                                              
     ---                                                       
     Logic — short-circuit / reorder / lazy
                                                                                
                                                                                
                                              
     G1. lambda_parameters(0, &fields) runs unconditionally at :285 even when 
there are no lambdas. Guard with if self.lambda_positions.is_empty() { skip }.
                                                                                
                                                                                
                                              
     G2. conditional_arguments is specced but unused by the evaluator. The 
trait exposes short_circuits() + conditional_arguments(args) to mark args as 
lazy, but HigherOrderFunctionExpr::evaluate still     
     eagerly arg.evaluate(batch) for every non-lambda value (:346). A 
short-circuiting array_exists/array_any can't benefit from skipping remaining 
work today. Wire this up.                                 
                                                                                
                                                                                
                                              
     G3. remove_list_null_values path (:349-355) is entered for every 
List/LargeList arg. The callee has a null_count == 0 fast path that returns 
list.clone(), but you still pay an outer Arc::new+match. Add
      array.null_count() == 0 short-circuit at the caller.
                                                                                
                                                                                
                                              
     G4. Constant-body lambdas aren't specialized. array_transform([..5], v -> 
42) builds a RecordBatch and evaluates the body over all values despite the 
body not referencing v. Detect "no LambdaVariable  
     in body" at construction; evaluate the body on an empty batch once and 
replicate.
                                                                                
                                                                                
                                              
     ---                                                                        
                                                                                
                                              
     Expressions — word/SIMD parallelism & CSE
                                                                                
                                                                                
                                              
     E1. Per-row predicate scans are elementwise today (see my compute_exists 
in array_exists.rs and the array_transform pattern). Replace the for i in 
start..end { if predicate.value(i) … } loops with
     Arrow's bit-level operations on the predicate's underlying BooleanBuffer:  
                                                                                
                                              
                                                               
     let row_slice = predicate.slice(start, end - start);                       
                                                                                
                                              
     if row_slice.true_count() > 0   { TRUE }            // SIMD popcount       
                                                                                
                                              
     else if row_slice.null_count() > 0 { NULL }                                
                                                                                
                                              
     else                               { FALSE }                               
                                                                                
                                              
                                                                                
                                                                                
                                              
     Same trick applies to any boolean-reducing HOF (exists, forall, filter 
size estimation).                                                               
                                                  
                                                                                
                                                                                
                                              
     E2. self.lambda_positions.contains(&i) is a repeated CSE target (L1 above 
covers it).                                                                     
                                               
                                                               
     ---                                                                        
                                                                                
                                              
     Procedures — parallelism                                  
                                                                                
                                                                                
                                              
     P1. Flat lambda evaluation over list_values is embarrassingly parallel. 
For per-row-independent HOFs (transform, exists, filter, any, forall), 
list_values can be chunked and the lambda body evaluated
     in parallel per chunk, then concatenated. Reuses Arrow's kernels that 
already release the GIL equivalent. Gates on detecting side-effect-free lambdas 
(e.g. !is_volatile_node()).                        
                                                               
     P2. No HOF benchmarks exist (rg "HigherOrder" benches/ returns nothing). 
Before any of the above, add a Criterion bench for array_transform over (List, 
LargeList, FixedSizeList × 10K/1M rows ×         
     scalar/columnar lambda body). Without baseline numbers, "faster" is 
unmeasurable.
                                                                                
                                                                                
                                              
     ---                                                       
     Suggested priority order
                                                                                
                                                                                
                                              
     1. D3 + L1 + L2 (cache ArgSlot, fold loops) — one edit, removes the O(N·L) 
classification and two Vec reallocs per batch.
     2. D1 (cache lambda Schema) — one-line fix, big win for nested HOFs.       
                                                                                
                                              
     3. E1 (SIMD popcount on predicate boolean buffer) — biggest per-element 
win on array_exists / filter / forall.                                          
                                                 
     4. P2 (add Criterion benches) — required to validate any of the above.     
                                                                                
                                              
     5. G2 (wire up conditional_arguments) — needed before short-circuiting 
HOFs like exists can skip work.
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to