kosiew commented on code in PR #21710:
URL: https://github.com/apache/datafusion/pull/21710#discussion_r3158787284


##########
datafusion/spark/src/function/math/ceil.rs:
##########
@@ -69,7 +90,12 @@ impl ScalarUDFImpl for SparkCeil {
     }
 
     fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        let has_scale = arg_types.len() == 2;
+
         match &arg_types[0] {
+            // 2-arg decimal is not yet supported; report input type so the 
planner
+            // does not reject the call before we surface a proper error at 
execution.
+            DataType::Decimal128(p, s) if has_scale => 
Ok(DataType::Decimal128(*p, *s)),

Review Comment:
   Good progress here, but there is still a mismatch between planning and 
execution for the 2-arg decimal case.
   
   Right now return_type allows Decimal128(p, s), so the planner accepts the 
query. Then execution always throws not_impl_err. That means users only see the 
failure at runtime, which can be confusing.
   
   It would be cleaner to fail early in return_type instead, so the planner 
rejects it with a clear message. Something like:
   
   DataType::Decimal128(_, _) if has_scale => {
       exec_err!("2-argument ceil is not yet supported for decimal inputs")
   }
   
   This keeps the schema contract honest and improves the developer experience.



##########
datafusion/spark/src/function/math/ceil.rs:
##########
@@ -168,137 +336,18 @@ fn spark_ceil_array(input: &Arc<dyn 
arrow::array::Array>) -> Result<ColumnarValu
     Ok(ColumnarValue::Array(result))
 }
 
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use arrow::array::{Decimal128Array, Float32Array, Float64Array, 
Int64Array};
-    use datafusion_common::ScalarValue;
-
-    #[test]
-    fn test_ceil_float64() {
-        let input = Float64Array::from(vec![
-            Some(125.2345),
-            Some(15.0001),
-            Some(0.1),
-            Some(-0.9),
-            Some(-1.1),
-            Some(123.0),
-            None,
-        ]);
-        let args = vec![ColumnarValue::Array(Arc::new(input))];
-        let result = spark_ceil(&args).unwrap();
-        let result = match result {
-            ColumnarValue::Array(arr) => arr,
-            _ => panic!("Expected array"),
-        };
-        let result = result.as_primitive::<Int64Type>();
-        assert_eq!(
-            result,
-            &Int64Array::from(vec![
-                Some(126),
-                Some(16),
-                Some(1),
-                Some(0),
-                Some(-1),
-                Some(123),
-                None,
-            ])
-        );
-    }
-
-    #[test]
-    fn test_ceil_float32() {
-        let input = Float32Array::from(vec![
-            Some(125.2345f32),
-            Some(15.0001f32),
-            Some(0.1f32),
-            Some(-0.9f32),
-            Some(-1.1f32),
-            Some(123.0f32),
-            None,
-        ]);
-        let args = vec![ColumnarValue::Array(Arc::new(input))];
-        let result = spark_ceil(&args).unwrap();
-        let result = match result {
-            ColumnarValue::Array(arr) => arr,
-            _ => panic!("Expected array"),
-        };
-        let result = result.as_primitive::<Int64Type>();
-        assert_eq!(
-            result,
-            &Int64Array::from(vec![
-                Some(126),
-                Some(16),
-                Some(1),
-                Some(0),
-                Some(-1),
-                Some(123),
-                None,
-            ])
-        );
-    }
-
-    #[test]
-    fn test_ceil_int64() {
-        let input = Int64Array::from(vec![Some(1), Some(-1), None]);
-        let args = vec![ColumnarValue::Array(Arc::new(input))];
-        let result = spark_ceil(&args).unwrap();
-        let result = match result {
-            ColumnarValue::Array(arr) => arr,
-            _ => panic!("Expected array"),
-        };
-        let result = result.as_primitive::<Int64Type>();
-        assert_eq!(result, &Int64Array::from(vec![Some(1), Some(-1), None]));
-    }
-
-    #[test]
-    fn test_ceil_decimal128() {
-        // Decimal128(10, 2): 150 = 1.50, -150 = -1.50, 100 = 1.00
-        let return_type = DataType::Decimal128(9, 0);
-        let input = Decimal128Array::from(vec![Some(150), Some(-150), 
Some(100), None])
-            .with_data_type(DataType::Decimal128(10, 2));
-        let args = vec![ColumnarValue::Array(Arc::new(input))];
-        let result = spark_ceil(&args).unwrap();
-        let result = match result {
-            ColumnarValue::Array(arr) => arr,
-            _ => panic!("Expected array"),
-        };
-        let result = result.as_primitive::<Decimal128Type>();
-        let expected = Decimal128Array::from(vec![Some(2), Some(-1), Some(1), 
None])
-            .with_data_type(return_type);
-        assert_eq!(result, &expected);
-    }
-
-    #[test]
-    fn test_ceil_float64_scalar() {
-        let input = ScalarValue::Float64(Some(-1.1));
-        let args = vec![ColumnarValue::Scalar(input)];
-        let result = match spark_ceil(&args).unwrap() {
-            ColumnarValue::Scalar(v) => v,
-            _ => panic!("Expected scalar"),
-        };
-        assert_eq!(result, ScalarValue::Int64(Some(-1)));
-    }
-
-    #[test]
-    fn test_ceil_float32_scalar() {
-        let input = ScalarValue::Float32(Some(125.2345f32));
-        let args = vec![ColumnarValue::Scalar(input)];
-        let result = match spark_ceil(&args).unwrap() {
-            ColumnarValue::Scalar(v) => v,
-            _ => panic!("Expected scalar"),
-        };
-        assert_eq!(result, ScalarValue::Int64(Some(126)));
-    }
-
-    #[test]
-    fn test_ceil_int64_scalar() {
-        let input = ScalarValue::Int64(Some(48));
-        let args = vec![ColumnarValue::Scalar(input)];
-        let result = match spark_ceil(&args).unwrap() {
-            ColumnarValue::Scalar(v) => v,
-            _ => panic!("Expected scalar"),
-        };
-        assert_eq!(result, ScalarValue::Int64(Some(48)));
+fn ceil_float<T: num_traits::Float>(value: T, scale: i32) -> T {
+    if scale >= 0 {
+        let factor = T::from(10.0f64.powi(scale)).unwrap_or_else(T::infinity);
+        if factor.is_infinite() {
+            return value;
+        }
+        (value * factor).ceil() / factor
+    } else {
+        let factor = T::from(10.0f64.powi(-scale)).unwrap_or_else(T::infinity);

Review Comment:
   This edge case caught my eye. When scale is very negative and the factor 
overflows, we silently return 0.0.
   
   I see this matches the integer path, but for floats it is not obvious why 
that is correct. It might help to add a short comment explaining the intent 
here, especially that extremely coarse rounding collapses everything to zero 
and that this is consistent with the integer behavior.
   
   A tiny note would save future readers some head scratching.



##########
datafusion/spark/src/function/math/ceil.rs:
##########
@@ -153,7 +287,41 @@ fn spark_ceil_array(input: &Arc<dyn arrow::array::Array>) 
-> Result<ColumnarValu
                 .as_primitive::<Float64Type>()
                 .unary::<_, Int64Type>(|x| x.ceil() as i64),
         ) as _,
+        // 2-arg integer: widen to Int64 and apply scale-aware ceiling.
+        DataType::Int8 if has_scale => impl_integer_array_ceil!(input, 
Int8Type, scale),
+        DataType::Int16 if has_scale => impl_integer_array_ceil!(input, 
Int16Type, scale),
+        DataType::Int32 if has_scale => impl_integer_array_ceil!(input, 
Int32Type, scale),
+        DataType::Int64 if has_scale => impl_integer_array_ceil!(input, 
Int64Type, scale),
+        DataType::UInt8 if has_scale => impl_integer_array_ceil!(input, 
UInt8Type, scale),
+        DataType::UInt16 if has_scale => {
+            impl_integer_array_ceil!(input, UInt16Type, scale)
+        }
+        DataType::UInt32 if has_scale => {
+            impl_integer_array_ceil!(input, UInt32Type, scale)
+        }
+        DataType::UInt64 if has_scale => {
+            let array = input.as_primitive::<UInt64Type>();

Review Comment:
   The error construction here works, but it is a bit hard to read.
   
   The (exec_err!(...) as Result<(), _>).unwrap_err() pattern feels a little 
roundabout. You could simplify this by mapping directly to a DataFusionError, 
similar to how other paths handle it.
   
   Something like:
   
   let v = i64::try_from(x).map_err(|_| {
       datafusion_common::exec_datafusion_err!(
           "ceil: UInt64 value {x} exceeds i64::MAX and cannot be processed"
       )
   })?;
   
   Same behavior, but easier on the eyes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to