tomz opened a new pull request, #4187:
URL: https://github.com/apache/datafusion-comet/pull/4187
# Draft Pull Request — apache/datafusion-comet
## Title
fix(spark-expr): preserve scalar tag in `WideDecimalBinaryExpr` when both
inputs are scalars
## Closes
Closes #1615 (TPC-DS q23 BHJ scalar-subquery crash)
## Summary
`WideDecimalBinaryExpr::evaluate` currently always returns
`ColumnarValue::Array`, even when both inputs are `ColumnarValue::Scalar`.
Returning a length-1 `Array` instead of a `Scalar` drops the scalar tag, so
downstream expressions (binary ops, comparisons) take the array path through
arrow-rs kernels.
When the resulting length-1 array later meets a full-batch column in a
`>`/`<`/`=` comparison, arrow-ord's `compare_op` rejects it with:
```
Cannot compare arrays of different lengths, got 8192 vs 1
```
This crashes any plan in which a wide-decimal arithmetic result of two
scalars feeds into a row-level comparison. The clearest reproducer is **TPC-DS
q23**, whose plan contains the filter
```
cast((0.95 * Subquery#268) as decimal(38,2)) > ssales
```
`0.95 * scalar_subquery` is a `Scalar × Scalar` wide-decimal multiply; the
broken-out length-1 array is then compared against the 8192-row `ssales` column
from the build side of the BroadcastHashJoin and the executor crashes.
## Fix
Track scalar-ness of both inputs at the top of `evaluate`, and at the end,
if both inputs were scalars, unwrap the length-1 result back into a
`ColumnarValue::Scalar` via `ScalarValue::try_from_array(&result, 0)`.
```diff
+ // Track scalar-ness so we can return a Scalar when both inputs are
scalars.
+ // Without this, a (Scalar op Scalar) result is returned as a
length-1 Array,
+ // and downstream comparisons against full batches incorrectly see
two Array
+ // operands with mismatched lengths instead of (Array, Scalar).
+ let both_scalar = matches!(
+ (&left_val, &right_val),
+ (ColumnarValue::Scalar(_), ColumnarValue::Scalar(_))
+ );
let (left_arr, right_arr): (ArrayRef, ArrayRef) = match (&left_val,
&right_val) {
...
};
...
let result = result.with_data_type(DataType::Decimal128(p_out,
s_out));
- Ok(ColumnarValue::Array(Arc::new(result)))
+ if both_scalar {
+ let scalar =
datafusion::common::ScalarValue::try_from_array(&result, 0)?;
+ Ok(ColumnarValue::Scalar(scalar))
+ } else {
+ Ok(ColumnarValue::Array(Arc::new(result)))
+ }
```
Three lines of real logic; the rest is a comment block explaining the bug.
## Tests
Added a unit test `test_scalar_scalar_returns_scalar` to
`wide_decimal_binary_expr.rs` covering:
1. `Scalar × Scalar` decimal multiply returns `ColumnarValue::Scalar`, not
`Array`.
2. Existing `Array op X` and `X op Array` paths still return `Array`
(regression guard).
End-to-end: TPC-DS q23 now passes at SF1 and SF10 (previously crashed in
both).
## Benchmark Results (TPC-DS, 10 cores, Spark 4.2.0-SNAPSHOT)
| | Before fix | After fix |
|---|---|---|
| **SF1 pass rate** | 98 / 99 (q23 crash) | **99 / 99** |
| **SF1 total time** | 279.9s | **259.9s** (1.08× faster) |
| **SF1 q14 time** | 32.8s | **10.1s** (3.2× faster) |
| **SF10 pass rate** | 98 / 99 (q23 crash) | **99 / 99** (3 consecutive
runs) |
| **SF10 q23 time** | crash | 16.3 / 17.0 / 17.7s (consistent) |
The q14 speedup is a bonus — q14 has the same `Scalar × Scalar` decimal
pattern in its filter, and previously paid a heavy materialization cost from
the bogus length-1 array path.
## Why this didn't trip earlier tests
`WideDecimalBinaryExpr` is only chosen when output precision ≥ 38. Most
existing tests evaluate it against `RecordBatch` inputs (i.e. `Array` operands
via `Column`), so the `Scalar × Scalar` branch was never exercised. The bug
only surfaces when the planner produces both-scalar inputs, which happens with
`literal × scalar_subquery` — exactly q23's pattern.
## Risk
Minimal:
- No change to numeric semantics; the result bytes are unchanged.
- Behavioral change is strictly in the *wrapping* of the result (`Scalar` vs
`Array`).
- All callers downstream already handle `ColumnarValue::Scalar` correctly —
that's the normal contract.
## Files changed
- `native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs` (+~16 lines
incl. comments + 1 unit test)
## Suggested commit message
```
fix(spark-expr): preserve scalar tag in WideDecimalBinaryExpr when both
inputs are scalars
WideDecimalBinaryExpr::evaluate always returned ColumnarValue::Array,
even when both inputs were Scalar. The resulting length-1 array lost
its scalar tag, so a downstream comparison against a full batch hit
arrow-ord's "Cannot compare arrays of different lengths, got 8192 vs 1".
This is the root cause of the TPC-DS q23 BroadcastHashJoin crash
(issue #1615): the filter `0.95 * scalar_subquery > ssales` produces
a Scalar × Scalar decimal multiply whose length-1 result was then
compared against the 8192-row ssales column.
Detect the both-scalar case and unwrap the length-1 result back into
a ColumnarValue::Scalar so downstream kernels take the scalar fast-path.
Adds a unit test for the both-scalar path.
Closes #1615
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]