kosiew commented on code in PR #21710:
URL: https://github.com/apache/datafusion/pull/21710#discussion_r3158787284
##########
datafusion/spark/src/function/math/ceil.rs:
##########
@@ -69,7 +90,12 @@ impl ScalarUDFImpl for SparkCeil {
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ let has_scale = arg_types.len() == 2;
+
match &arg_types[0] {
+ // 2-arg decimal is not yet supported; report input type so the
planner
+ // does not reject the call before we surface a proper error at
execution.
+ DataType::Decimal128(p, s) if has_scale =>
Ok(DataType::Decimal128(*p, *s)),
Review Comment:
Good progress here, but there is still a mismatch between planning and
execution for the 2-arg decimal case.
Right now return_type allows Decimal128(p, s), so the planner accepts the
query. Then execution always throws not_impl_err. That means users only see the
failure at runtime, which can be confusing.
It would be cleaner to fail early in return_type instead, so the planner
rejects it with a clear message. Something like:
DataType::Decimal128(_, _) if has_scale => {
exec_err!("2-argument ceil is not yet supported for decimal inputs")
}
This keeps the schema contract honest and improves the developer experience.
##########
datafusion/spark/src/function/math/ceil.rs:
##########
@@ -168,137 +336,18 @@ fn spark_ceil_array(input: &Arc<dyn
arrow::array::Array>) -> Result<ColumnarValu
Ok(ColumnarValue::Array(result))
}
-#[cfg(test)]
-mod tests {
- use super::*;
- use arrow::array::{Decimal128Array, Float32Array, Float64Array,
Int64Array};
- use datafusion_common::ScalarValue;
-
- #[test]
- fn test_ceil_float64() {
- let input = Float64Array::from(vec![
- Some(125.2345),
- Some(15.0001),
- Some(0.1),
- Some(-0.9),
- Some(-1.1),
- Some(123.0),
- None,
- ]);
- let args = vec![ColumnarValue::Array(Arc::new(input))];
- let result = spark_ceil(&args).unwrap();
- let result = match result {
- ColumnarValue::Array(arr) => arr,
- _ => panic!("Expected array"),
- };
- let result = result.as_primitive::<Int64Type>();
- assert_eq!(
- result,
- &Int64Array::from(vec![
- Some(126),
- Some(16),
- Some(1),
- Some(0),
- Some(-1),
- Some(123),
- None,
- ])
- );
- }
-
- #[test]
- fn test_ceil_float32() {
- let input = Float32Array::from(vec![
- Some(125.2345f32),
- Some(15.0001f32),
- Some(0.1f32),
- Some(-0.9f32),
- Some(-1.1f32),
- Some(123.0f32),
- None,
- ]);
- let args = vec![ColumnarValue::Array(Arc::new(input))];
- let result = spark_ceil(&args).unwrap();
- let result = match result {
- ColumnarValue::Array(arr) => arr,
- _ => panic!("Expected array"),
- };
- let result = result.as_primitive::<Int64Type>();
- assert_eq!(
- result,
- &Int64Array::from(vec![
- Some(126),
- Some(16),
- Some(1),
- Some(0),
- Some(-1),
- Some(123),
- None,
- ])
- );
- }
-
- #[test]
- fn test_ceil_int64() {
- let input = Int64Array::from(vec![Some(1), Some(-1), None]);
- let args = vec![ColumnarValue::Array(Arc::new(input))];
- let result = spark_ceil(&args).unwrap();
- let result = match result {
- ColumnarValue::Array(arr) => arr,
- _ => panic!("Expected array"),
- };
- let result = result.as_primitive::<Int64Type>();
- assert_eq!(result, &Int64Array::from(vec![Some(1), Some(-1), None]));
- }
-
- #[test]
- fn test_ceil_decimal128() {
- // Decimal128(10, 2): 150 = 1.50, -150 = -1.50, 100 = 1.00
- let return_type = DataType::Decimal128(9, 0);
- let input = Decimal128Array::from(vec![Some(150), Some(-150),
Some(100), None])
- .with_data_type(DataType::Decimal128(10, 2));
- let args = vec![ColumnarValue::Array(Arc::new(input))];
- let result = spark_ceil(&args).unwrap();
- let result = match result {
- ColumnarValue::Array(arr) => arr,
- _ => panic!("Expected array"),
- };
- let result = result.as_primitive::<Decimal128Type>();
- let expected = Decimal128Array::from(vec![Some(2), Some(-1), Some(1),
None])
- .with_data_type(return_type);
- assert_eq!(result, &expected);
- }
-
- #[test]
- fn test_ceil_float64_scalar() {
- let input = ScalarValue::Float64(Some(-1.1));
- let args = vec![ColumnarValue::Scalar(input)];
- let result = match spark_ceil(&args).unwrap() {
- ColumnarValue::Scalar(v) => v,
- _ => panic!("Expected scalar"),
- };
- assert_eq!(result, ScalarValue::Int64(Some(-1)));
- }
-
- #[test]
- fn test_ceil_float32_scalar() {
- let input = ScalarValue::Float32(Some(125.2345f32));
- let args = vec![ColumnarValue::Scalar(input)];
- let result = match spark_ceil(&args).unwrap() {
- ColumnarValue::Scalar(v) => v,
- _ => panic!("Expected scalar"),
- };
- assert_eq!(result, ScalarValue::Int64(Some(126)));
- }
-
- #[test]
- fn test_ceil_int64_scalar() {
- let input = ScalarValue::Int64(Some(48));
- let args = vec![ColumnarValue::Scalar(input)];
- let result = match spark_ceil(&args).unwrap() {
- ColumnarValue::Scalar(v) => v,
- _ => panic!("Expected scalar"),
- };
- assert_eq!(result, ScalarValue::Int64(Some(48)));
+fn ceil_float<T: num_traits::Float>(value: T, scale: i32) -> T {
+ if scale >= 0 {
+ let factor = T::from(10.0f64.powi(scale)).unwrap_or_else(T::infinity);
+ if factor.is_infinite() {
+ return value;
+ }
+ (value * factor).ceil() / factor
+ } else {
+ let factor = T::from(10.0f64.powi(-scale)).unwrap_or_else(T::infinity);
Review Comment:
This edge case caught my eye. When scale is very negative and the factor
overflows, we silently return 0.0.
I see this matches the integer path, but for floats it is not obvious why
that is correct. It might help to add a short comment explaining the intent
here, especially that extremely coarse rounding collapses everything to zero
and that this is consistent with the integer behavior.
A tiny note would save future readers some head scratching.
##########
datafusion/spark/src/function/math/ceil.rs:
##########
@@ -153,7 +287,41 @@ fn spark_ceil_array(input: &Arc<dyn arrow::array::Array>)
-> Result<ColumnarValu
.as_primitive::<Float64Type>()
.unary::<_, Int64Type>(|x| x.ceil() as i64),
) as _,
+ // 2-arg integer: widen to Int64 and apply scale-aware ceiling.
+ DataType::Int8 if has_scale => impl_integer_array_ceil!(input,
Int8Type, scale),
+ DataType::Int16 if has_scale => impl_integer_array_ceil!(input,
Int16Type, scale),
+ DataType::Int32 if has_scale => impl_integer_array_ceil!(input,
Int32Type, scale),
+ DataType::Int64 if has_scale => impl_integer_array_ceil!(input,
Int64Type, scale),
+ DataType::UInt8 if has_scale => impl_integer_array_ceil!(input,
UInt8Type, scale),
+ DataType::UInt16 if has_scale => {
+ impl_integer_array_ceil!(input, UInt16Type, scale)
+ }
+ DataType::UInt32 if has_scale => {
+ impl_integer_array_ceil!(input, UInt32Type, scale)
+ }
+ DataType::UInt64 if has_scale => {
+ let array = input.as_primitive::<UInt64Type>();
Review Comment:
The error construction here works, but it is a bit hard to read.
The (exec_err!(...) as Result<(), _>).unwrap_err() pattern feels a little
roundabout. You could simplify this by mapping directly to a DataFusionError,
similar to how other paths handle it.
Something like:
let v = i64::try_from(x).map_err(|_| {
datafusion_common::exec_datafusion_err!(
"ceil: UInt64 value {x} exceeds i64::MAX and cannot be processed"
)
})?;
Same behavior, but easier on the eyes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]