LiaCastaneda commented on issue #21172:
URL: https://github.com/apache/datafusion/issues/21172#issuecomment-4333459693

   We can also add the perf optimizations @comphead  mentioned in the trait PR:
   > I tried to build `array_exists` spark function using current approach 
https://github.com/apache/datafusion/pull/21881
   > 
   > Good thing it works. From the performance point of view there are some 
improvements can be made for lambdas but likely we can address it in follow, 
@gstvg it is up to you
   > 
   > 
   > 
   > ```
   >  Loops — hoisting & fusion                                                 
                                                                                
                                               
   >                                                                            
                                                                                
                                                
   >   L1. Three passes over self.args in one evaluate call. 
datafusion/physical-expr/src/higher_order_function.rs:247, :269, :309 each 
iterate self.args, and each of the first and third pass calls           
   >   self.lambda_positions.contains(&i) (linear scan) inside the loop body. 
For N args with L lambdas, that's O(N·L) per batch just to classify args — the 
classification is already known at construction    
   >   time.                                                     
   >                                                                            
                                                                                
                                                
   >   - Fix: Replace lambda_positions: Vec<usize> with a per-arg enum cached 
on the struct:                                                                  
                                                  
   >   enum ArgSlot { Value(Arc<dyn PhysicalExpr>), Lambda(Arc<LambdaExpr>) }
   >   slots: Vec<ArgSlot>,                                                     
                                                                                
                                                
   >   - The three passes then become one fused loop; O(N) per batch, zero 
allocation for the lookup, and the slow wrapped_lambda tree walk (:459-474) 
collapses to a cached Arc<LambdaExpr>.                   
   >                                                                            
                                                                                
                                                
   >   L2. Pass 2 is pure reshaping of pass 1 (arg_fields → fields, cloning 
Arcs). Can be folded into pass 1 by building the Vec<ValueOrLambda<FieldRef, 
Option<FieldRef>>> directly.                           
   >                                                                            
                                                                                
                                                
   >   ---                                                                      
                                                                                
                                                
   >   Data structures — precompute / augment                    
   >                                                                            
                                                                                
                                                
   >   D1. LambdaArgument::evaluate rebuilds the inner Schema on every call. 
datafusion/expr/src/higher_order_function.rs:256:
   >   let schema = Arc::new(Schema::new(self.params.clone()));                 
                                                                                
                                                
   >   Schema::new builds a name→index HashMap internally. For a nested HOF 
(array_transform([[1,2]], a -> array_transform(a, b -> ...))) this runs once 
per outer sublist per batch. Build it once in
   >   LambdaArgument::new and store Arc<Schema>.                               
                                                                                
                                                
   >                                                                            
                                                                                
                                                
   >   D2. self.fun.clear_null_values() is called once per non-lambda arg per 
batch (:349). It's a &self method returning a constant bool. Cache as a bool on 
HigherOrderFunctionExpr at construction.          
   >                                                                            
                                                                                
                                                
   >   D3. wrapped_lambda walks the expr tree every evaluate. :253 and :315 
both call it. Pre-resolve the Arc<LambdaExpr> once at try_new_with_schema / 
with_new_children and store it — lambdas don't change   
   >   mid-execution.                                                           
                                                                                
                                                
   >                                                                            
                                                                                
                                                
   >   D4. HigherOrderFunctionArgs::args and ::arg_fields are always Vec even 
for 2-arg HOFs (array_transform, array_exists, array_filter — the vast 
majority). Switching to smallvec::SmallVec<[_; 2]>         
   >   eliminates a heap alloc per batch for the common shape.
   >                                                                            
                                                                                
                                                
   >   ---                                                       
   >   Logic — short-circuit / reorder / lazy
   >                                                                            
                                                                                
                                                
   >   G1. lambda_parameters(0, &fields) runs unconditionally at :285 even when 
there are no lambdas. Guard with if self.lambda_positions.is_empty() { skip }.
   >                                                                            
                                                                                
                                                
   >   G2. conditional_arguments is specced but unused by the evaluator. The 
trait exposes short_circuits() + conditional_arguments(args) to mark args as 
lazy, but HigherOrderFunctionExpr::evaluate still     
   >   eagerly arg.evaluate(batch) for every non-lambda value (:346). A 
short-circuiting array_exists/array_any can't benefit from skipping remaining 
work today. Wire this up.                                 
   >                                                                            
                                                                                
                                                
   >   G3. remove_list_null_values path (:349-355) is entered for every 
List/LargeList arg. The callee has a null_count == 0 fast path that returns 
list.clone(), but you still pay an outer Arc::new+match. Add
   >    array.null_count() == 0 short-circuit at the caller.
   >                                                                            
                                                                                
                                                
   >   G4. Constant-body lambdas aren't specialized. array_transform([..5], v 
-> 42) builds a RecordBatch and evaluates the body over all values despite the 
body not referencing v. Detect "no LambdaVariable  
   >   in body" at construction; evaluate the body on an empty batch once and 
replicate.
   >                                                                            
                                                                                
                                                
   >   ---                                                                      
                                                                                
                                                
   >   Expressions — word/SIMD parallelism & CSE
   >                                                                            
                                                                                
                                                
   >   E1. Per-row predicate scans are elementwise today (see my compute_exists 
in array_exists.rs and the array_transform pattern). Replace the for i in 
start..end { if predicate.value(i) … } loops with
   >   Arrow's bit-level operations on the predicate's underlying 
BooleanBuffer:                                                                  
                                                              
   >                                                             
   >   let row_slice = predicate.slice(start, end - start);                     
                                                                                
                                                
   >   if row_slice.true_count() > 0   { TRUE }            // SIMD popcount     
                                                                                
                                                
   >   else if row_slice.null_count() > 0 { NULL }                              
                                                                                
                                                
   >   else                               { FALSE }                             
                                                                                
                                                
   >                                                                            
                                                                                
                                                
   >   Same trick applies to any boolean-reducing HOF (exists, forall, filter 
size estimation).                                                               
                                                  
   >                                                                            
                                                                                
                                                
   >   E2. self.lambda_positions.contains(&i) is a repeated CSE target (L1 
above covers it).                                                               
                                                     
   >                                                             
   >   ---                                                                      
                                                                                
                                                
   >   Procedures — parallelism                                  
   >                                                                            
                                                                                
                                                
   >   P1. Flat lambda evaluation over list_values is embarrassingly parallel. 
For per-row-independent HOFs (transform, exists, filter, any, forall), 
list_values can be chunked and the lambda body evaluated
   >   in parallel per chunk, then concatenated. Reuses Arrow's kernels that 
already release the GIL equivalent. Gates on detecting side-effect-free lambdas 
(e.g. !is_volatile_node()).                        
   >                                                             
   >   P2. No HOF benchmarks exist (rg "HigherOrder" benches/ returns nothing). 
Before any of the above, add a Criterion bench for array_transform over (List, 
LargeList, FixedSizeList × 10K/1M rows ×         
   >   scalar/columnar lambda body). Without baseline numbers, "faster" is 
unmeasurable.
   >                                                                            
                                                                                
                                                
   >   ---                                                       
   >   Suggested priority order
   >                                                                            
                                                                                
                                                
   >   1. D3 + L1 + L2 (cache ArgSlot, fold loops) — one edit, removes the 
O(N·L) classification and two Vec reallocs per batch.
   >   2. D1 (cache lambda Schema) — one-line fix, big win for nested HOFs.     
                                                                                
                                                
   >   3. E1 (SIMD popcount on predicate boolean buffer) — biggest per-element 
win on array_exists / filter / forall.                                          
                                                 
   >   4. P2 (add Criterion benches) — required to validate any of the above.   
                                                                                
                                                
   >   5. G2 (wire up conditional_arguments) — needed before short-circuiting 
HOFs like exists can skip work.
   > ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to