LiaCastaneda commented on issue #21172: URL: https://github.com/apache/datafusion/issues/21172#issuecomment-4333459693
We can also add the perf optimizations @comphead mentioned in the trait PR: > I tried to build `array_exists` spark function using current approach https://github.com/apache/datafusion/pull/21881 > > Good thing it works. From the performance point of view there are some improvements can be made for lambdas but likely we can address it in follow, @gstvg it is up to you > > > > ``` > Loops — hoisting & fusion > > L1. Three passes over self.args in one evaluate call. datafusion/physical-expr/src/higher_order_function.rs:247, :269, :309 each iterate self.args, and each of the first and third pass calls > self.lambda_positions.contains(&i) (linear scan) inside the loop body. For N args with L lambdas, that's O(N·L) per batch just to classify args — the classification is already known at construction > time. > > - Fix: Replace lambda_positions: Vec<usize> with a per-arg enum cached on the struct: > enum ArgSlot { Value(Arc<dyn PhysicalExpr>), Lambda(Arc<LambdaExpr>) } > slots: Vec<ArgSlot>, > - The three passes then become one fused loop; O(N) per batch, zero allocation for the lookup, and the slow wrapped_lambda tree walk (:459-474) collapses to a cached Arc<LambdaExpr>. > > L2. Pass 2 is pure reshaping of pass 1 (arg_fields → fields, cloning Arcs). Can be folded into pass 1 by building the Vec<ValueOrLambda<FieldRef, Option<FieldRef>>> directly. > > --- > Data structures — precompute / augment > > D1. LambdaArgument::evaluate rebuilds the inner Schema on every call. datafusion/expr/src/higher_order_function.rs:256: > let schema = Arc::new(Schema::new(self.params.clone())); > Schema::new builds a name→index HashMap internally. For a nested HOF (array_transform([[1,2]], a -> array_transform(a, b -> ...))) this runs once per outer sublist per batch. Build it once in > LambdaArgument::new and store Arc<Schema>. > > D2. self.fun.clear_null_values() is called once per non-lambda arg per batch (:349). It's a &self method returning a constant bool. Cache as a bool on HigherOrderFunctionExpr at construction. > > D3. wrapped_lambda walks the expr tree every evaluate. :253 and :315 both call it. Pre-resolve the Arc<LambdaExpr> once at try_new_with_schema / with_new_children and store it — lambdas don't change > mid-execution. > > D4. HigherOrderFunctionArgs::args and ::arg_fields are always Vec even for 2-arg HOFs (array_transform, array_exists, array_filter — the vast majority). Switching to smallvec::SmallVec<[_; 2]> > eliminates a heap alloc per batch for the common shape. > > --- > Logic — short-circuit / reorder / lazy > > G1. lambda_parameters(0, &fields) runs unconditionally at :285 even when there are no lambdas. Guard with if self.lambda_positions.is_empty() { skip }. > > G2. conditional_arguments is specced but unused by the evaluator. The trait exposes short_circuits() + conditional_arguments(args) to mark args as lazy, but HigherOrderFunctionExpr::evaluate still > eagerly arg.evaluate(batch) for every non-lambda value (:346). A short-circuiting array_exists/array_any can't benefit from skipping remaining work today. Wire this up. > > G3. remove_list_null_values path (:349-355) is entered for every List/LargeList arg. The callee has a null_count == 0 fast path that returns list.clone(), but you still pay an outer Arc::new+match. Add > array.null_count() == 0 short-circuit at the caller. > > G4. Constant-body lambdas aren't specialized. array_transform([..5], v -> 42) builds a RecordBatch and evaluates the body over all values despite the body not referencing v. Detect "no LambdaVariable > in body" at construction; evaluate the body on an empty batch once and replicate. > > --- > Expressions — word/SIMD parallelism & CSE > > E1. Per-row predicate scans are elementwise today (see my compute_exists in array_exists.rs and the array_transform pattern). Replace the for i in start..end { if predicate.value(i) … } loops with > Arrow's bit-level operations on the predicate's underlying BooleanBuffer: > > let row_slice = predicate.slice(start, end - start); > if row_slice.true_count() > 0 { TRUE } // SIMD popcount > else if row_slice.null_count() > 0 { NULL } > else { FALSE } > > Same trick applies to any boolean-reducing HOF (exists, forall, filter size estimation). > > E2. self.lambda_positions.contains(&i) is a repeated CSE target (L1 above covers it). > > --- > Procedures — parallelism > > P1. Flat lambda evaluation over list_values is embarrassingly parallel. For per-row-independent HOFs (transform, exists, filter, any, forall), list_values can be chunked and the lambda body evaluated > in parallel per chunk, then concatenated. Reuses Arrow's kernels that already release the GIL equivalent. Gates on detecting side-effect-free lambdas (e.g. !is_volatile_node()). > > P2. No HOF benchmarks exist (rg "HigherOrder" benches/ returns nothing). Before any of the above, add a Criterion bench for array_transform over (List, LargeList, FixedSizeList × 10K/1M rows × > scalar/columnar lambda body). Without baseline numbers, "faster" is unmeasurable. > > --- > Suggested priority order > > 1. D3 + L1 + L2 (cache ArgSlot, fold loops) — one edit, removes the O(N·L) classification and two Vec reallocs per batch. > 2. D1 (cache lambda Schema) — one-line fix, big win for nested HOFs. > 3. E1 (SIMD popcount on predicate boolean buffer) — biggest per-element win on array_exists / filter / forall. > 4. P2 (add Criterion benches) — required to validate any of the above. > 5. G2 (wire up conditional_arguments) — needed before short-circuiting HOFs like exists can skip work. > ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
